return folder_content, "text/plain"
# TODO manage folders in http
else:
- return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"), \
- "application/octet-stream"
+ return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"),\
+ "application/octet-stream"
# pkgtype accept ZIP TEXT -> result
# manyfiles yes X -> zip
try:
user_passwd = standard_b64decode(user_passwd64).decode()
user, _, passwd = user_passwd.partition(":")
- except:
+ except Exception:
pass
outdata = self.engine.new_token(None, {"username": user, "password": passwd})
token = outdata["id"]
elif format_yaml:
try:
kwargs[k] = yaml.load(v)
- except:
+ except Exception:
pass
elif k.endswith(".gt") or k.endswith(".lt") or k.endswith(".gte") or k.endswith(".lte"):
try:
kwargs[k] = int(v)
- except:
+ except Exception:
try:
kwargs[k] = float(v)
- except:
+ except Exception:
pass
elif v.find(",") > 0:
kwargs[k] = v.split(",")
elif format_yaml:
try:
v[index] = yaml.load(v[index])
- except:
+ except Exception:
pass
return indata
elif method == "POST":
try:
session = self._authorization()
- except:
+ except Exception:
session = None
if kwargs:
indata.update(kwargs)
thread_info = None
if args and args[0] == "help":
return "<html><pre>\ninit\nfile/<name> download file\ndb-clear/table\nprune\nlogin\nlogin2\n"\
- "sleep/<time>\nmessage/topic\n</pre></html>"
+ "sleep/<time>\nmessage/topic\n</pre></html>"
elif args and args[0] == "init":
try:
for k, v in environ.items():
if not k.startswith("OSMNBI_"):
continue
- k1, _, k2 = k[7:].lower().partition("_")
+ k1, _, k2 = k[7:].lower().partition("_")
if not k2:
continue
try:
+++ /dev/null
-nsd-catalog:
- nsd:
- - id: cirros_nsd
- name: cirros_ns
- short-name: cirros_ns
- description: Generated by OSM pacakage generator
- vendor: OSM
- version: '1.0'
-
- # Place the logo as png in icons directory and provide the name here
- logo: osm_2x.png
-
- # Specify the VNFDs that are part of this NSD
- constituent-vnfd:
- # The member-vnf-index needs to be unique, starting from 1
- # vnfd-id-ref is the id of the VNFD
- # Multiple constituent VNFDs can be specified
- - member-vnf-index: 1
- vnfd-id-ref: cirros_vnfd
- scaling-group-descriptor:
- - name: "scaling_cirros"
- vnfd-member:
- - count: 1
- member-vnf-index-ref: 1
- min-instance-count: 0
- max-instance-count: 10
- scaling-policy:
- - scaling-type: "manual"
- cooldown-time: 10
- threshold-time: 10
- name: manual_scale
- vld:
- # Networks for the VNFs
- - id: cirros_nsd_vld1
- name: cirros_nsd_vld1
- type: ELAN
- mgmt-network: 'true'
- # vim-network-name: <update>
- # provider-network:
- # segmentation_id: <update>
- vnfd-connection-point-ref:
- # Specify the constituent VNFs
- # member-vnf-index-ref - entry from constituent vnf
- # vnfd-id-ref - VNFD id
- # vnfd-connection-point-ref - connection point name in the VNFD
- - member-vnf-index-ref: 1
- vnfd-id-ref: cirros_vnfd
- # NOTE: Validate the entry below
- vnfd-connection-point-ref: eth0
+++ /dev/null
-vnfd-catalog:
- vnfd:
- - id: cirros_vnfd
- name: cirros_vnf
- short-name: cirros_vnf
- description: Simple VNF example with a cirros
- vendor: OSM
- version: '1.0'
-
- # Place the logo as png in icons directory and provide the name here
- logo: cirros-64.png
-
- # Management interface
- mgmt-interface:
- cp: eth0
-
- # Atleast one VDU need to be specified
- vdu:
- - id: cirros_vnfd-VM
- name: cirros_vnfd-VM
- description: cirros_vnfd-VM
- count: 1
-
- # Flavour of the VM to be instantiated for the VDU
- # flavor below can fit into m1.micro
- vm-flavor:
- vcpu-count: 1
- memory-mb: 256
- storage-gb: 2
-
- # Image/checksum or image including the full path
- image: 'cirros034'
- #checksum:
-
- interface:
- # Specify the external interfaces
- # There can be multiple interfaces defined
- - name: eth0
- type: EXTERNAL
- virtual-interface:
- type: VIRTIO
- bandwidth: '0'
- vpci: 0000:00:0a.0
- external-connection-point-ref: eth0
-
- connection-point:
- - name: eth0
- type: VPORT
+++ /dev/null
-#! /bin/bash
-
-export NBI_URL=https://localhost:9999/osm
-USERNAME=admin
-PASSWORD=admin
-PROJECT=admin
-VIM=ost2-mrt-tid #OST2_MRT #ost2-mrt-tid
-
-DESCRIPTORS=/home/ubuntu/descriptors #../local/descriptors
-DESCRIPTORS=../local/descriptors
-
-VNFD1=${DESCRIPTORS}/ping_vnf.tar.gz
-VNFD2=${DESCRIPTORS}/pong_vnf.tar.gz
-VNFD3=${DESCRIPTORS}/cirros_vnfd.yaml
-
-NSD1=${DESCRIPTORS}/ping_pong_ns.tar.gz
-NSD2=${DESCRIPTORS}/cirros_2vnf_ns.tar.gz
-NSD3=${DESCRIPTORS}/cirros_nsd.yaml
-
-[ -f "$VNFD1" ] || ! echo "not found ping_vnf.tar.gz. Set DESCRIPTORS variable to a proper location" || exit 1
-[ -f "$VNFD2" ] || ! echo "not found pong_vnf.tar.gz. Set DESCRIPTORS variable to a proper location" || exit 1
-[ -f "$VNFD3" ] || ! echo "not found cirros_vnfd.yaml. Set DESCRIPTORS variable to a proper location" || exit 1
-[ -f "$NSD1" ] || ! echo "not found ping_pong_ns.tar.gz. Set DESCRIPTORS variable to a proper location" || exit 1
-[ -f "$NSD2" ] || ! echo "not found cirros_2vnf_ns.tar.gz. Set DESCRIPTORS variable to a proper location" || exit 1
-[ -f "$NSD3" ] || ! echo "not found cirros_nsd.yaml. Set DESCRIPTORS variable to a proper location" || exit 1
-
-#get token
-TOKEN=`curl --insecure -H "Content-Type: application/yaml" -H "Accept: application/yaml" \
- --data "{username: $USERNAME, password: $PASSWORD, project_id: $PROJECT}" ${NBI_URL}/admin/v1/tokens \
- 2>/dev/null | awk '($1=="id:"){print $2}'`;
-echo token: $TOKEN
-
-
-# VNFD
-#########
-#insert PKG
-VNFD1_ID=`curl --insecure -w "%{http_code}\n" -H "Content-Type: application/gzip" -H "Accept: application/yaml" \
- -H "Authorization: Bearer $TOKEN" --data-binary "@$VNFD1" ${NBI_URL}/vnfpkgm/v1/vnf_packages_content \
- 2>/dev/null | awk '($1=="id:"){print $2}'`
-echo ping_vnfd: $VNFD1_ID
-
-VNFD2_ID=`curl --insecure -w "%{http_code}\n" -H "Content-Type: application/gzip" -H "Accept: application/yaml" \
- -H "Authorization: Bearer $TOKEN" --data-binary "@$VNFD2" ${NBI_URL}/vnfpkgm/v1/vnf_packages_content \
- 2>/dev/null | awk '($1=="id:"){print $2}'`
-echo pong_vnfd: $VNFD2_ID
-
-
-
-# NSD
-#########
-#insert PKG
-NSD1_ID=`curl --insecure -w "%{http_code}\n" -H "Content-Type: application/gzip" -H "Accept: application/yaml" \
- -H "Authorization: Bearer $TOKEN" --data-binary "@$NSD1" ${NBI_URL}/nsd/v1/ns_descriptors_content \
- 2>/dev/null | awk '($1=="id:"){print $2}'`
-echo ping_pong_nsd: $NSD1_ID
-
-
-# NSRS
-##############
-#add nsr
-NSR1_ID=`curl --insecure -w "%{http_code}\n" -H "Content-Type: application/yaml" -H "Accept: application/yaml" \
- -H "Authorization: Bearer $TOKEN" --data "{ nsDescription: default description, nsName: NSNAME, nsdId: $NSD1_ID, \
- ssh-authorized-key: [ {key-pair-ref: gerardo}, {key-pair-ref: alfonso}], vimAccountId: $VIM }" \
- ${NBI_URL}/nslcm/v1/ns_instances_content 2>/dev/null | awk '($1=="id:"){print $2}'` ;
-echo ping_pong_nsr: $NSR1_ID
-
-
-echo 'curl --insecure -w "%{http_code}\n" -H "Content-Type: application/yaml" -H "Accept: application/yaml"' \
- '-H "Authorization: Bearer '$TOKEN'" '${NBI_URL}'/nslcm/v1/ns_instances_content/'$NSR1_ID' 2>/dev/null | ' \
- 'grep -e detailed-status -e operational-status -e config-status'
-
-
-
-
+++ /dev/null
-#! /bin/bash
-
-export NBI_URL=https://localhost:9999/osm
-USERNAME=admin
-PASSWORD=admin
-PROJECT=admin
-
-
-
-#get token
-TOKEN=`curl --insecure -H "Content-Type: application/yaml" -H "Accept: application/yaml" --data "{username: $USERNAME, password: $PASSWORD, project_id: $PROJECT}" ${NBI_URL}/admin/v1/tokens 2>/dev/null | awk '($1=="id:"){print $2}' ` ; echo $TOKEN
-
-
-echo deleting all
-#DELETE ALL
-
-for url_item in nslcm/v1/ns_instances nsd/v1/ns_descriptors_content vnfpkgm/v1/vnf_packages_content
-do
- for ITEM_ID in `curl --insecure -w "%{http_code}\n" -H "Content-Type: application/yaml" -H "Accept: application/yaml" -H "Authorization: Bearer $TOKEN" ${NBI_URL}/${url_item} 2>/dev/null | awk '($1=="_id:") {print $2}'` ;
- do
- curl --insecure -w "%{http_code}\n" -H "Content-Type: application/yaml" -H "Accept: application/yaml" -H "Authorization: Bearer $TOKEN" ${NBI_URL}/${url_item}/$ITEM_ID -X DELETE
- done
-done
-
-# curl --insecure ${NBI_URL}/test/prune
-
+++ /dev/null
-#! /usr/bin/python3
-# -*- coding: utf-8 -*-
-
-import getopt
-import sys
-import requests
-import json
-import logging
-import yaml
-# import json
-import tarfile
-from os import makedirs
-
-__author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
-__date__ = "$2018-03-01$"
-__version__ = "0.1"
-version_date = "Mar 2018"
-
-
-def usage():
- print("Usage: ", sys.argv[0], "[options]")
- print(" --version: prints current version")
- print(" -f|--file FILE: file to be sent")
- print(" -h|--help: shows this help")
- print(" -u|--url URL: complete server URL")
- print(" -s|--chunk-size SIZE: size of chunks, by default 1000")
- print(" -t|--token TOKEN: Authorizaton token, previously obtained from server")
- print(" -v|--verbose print debug information, can be used several times")
- return
-
-
-r_header_json = {"Content-type": "application/json"}
-headers_json = {
- "Content-type": "application/json",
- "Accept": "application/json",
-}
-r_header_yaml = {"Content-type": "application/yaml"}
-headers_yaml = {
- "Content-type": "application/yaml",
- "Accept": "application/yaml",
-}
-r_header_text = {"Content-type": "text/plain"}
-r_header_octect = {"Content-type": "application/octet-stream"}
-headers_text = {
- "Accept": "text/plain",
-}
-r_header_zip = {"Content-type": "application/zip"}
-headers_zip = {
- "Accept": "application/zip",
-}
-# test without authorization
-test_not_authorized_list = (
- ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"),
- ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"),
- ("NA3", "Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"),
-)
-
-# test ones authorized
-test_authorized_list = (
- ("AU1", "Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id",
- headers_json, None, 404, r_header_json, "json"),
- ("AU2", "Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id",
- headers_yaml, None, 404, r_header_yaml, "yaml"),
- ("AU3", "Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id",
- headers_yaml, None, 404, r_header_yaml, "yaml"),
-)
-
-vim = {
- "schema_version": "1.0",
- "schema_type": "No idea",
- "name": "myVim",
- "description": "Descriptor name",
- "vim_type": "openstack",
- "vim_url": "http://localhost:/vim",
- "vim_tenant_name": "vimTenant",
- "vim_user": "user",
- "vim_password": "password",
- "config": {"config_param": 1}
-}
-
-vim_bad = vim.copy()
-vim_bad.pop("name")
-
-test_admin_list1 = (
- ("VIM1", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, vim, (201, 204),
- {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"),
- ("VIM2", "Create VIM bad schema", "POST", "/admin/v1/vim_accounts", headers_json, vim_bad, 422, None, headers_json),
- ("VIM2", "Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json, vim, 409, None, headers_json),
- ("VIM4", "Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml, None, 200, r_header_yaml, "yaml"),
- ("VIM5", "Show VIM", "GET", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 200, r_header_yaml, "yaml"),
- ("VIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 202, None, 0),
-)
-
-
-class TestException(Exception):
- pass
-
-
-class TestRest:
- def __init__(self, url_base, header_base={}, verify=False):
- self.url_base = url_base
- self.header_base = header_base
- self.s = requests.session()
- self.s.headers = header_base
- self.verify = verify
- # contains ID of tests obtained from Location response header. "" key contains last obtained id
- self.test_ids = {}
-
- def set_header(self, header):
- self.s.headers.update(header)
-
- def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers,
- expected_payload):
- """
- Performs an http request and check http code response. Exit if different than allowed. It get the returned id
- that can be used by following test in the URL with {name} where name is the name of the test
- :param name: short name of the test
- :param description: description of the test
- :param method: HTTP method: GET,PUT,POST,DELETE,...
- :param url: complete URL or relative URL
- :param headers: request headers to add to the base headers
- :param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
- :param expected_codes: expected response codes, can be int, int tuple or int range
- :param expected_headers: expected response headers, dict with key values
- :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
- :return: requests response
- """
- try:
- if not self.s:
- self.s = requests.session()
- # URL
- if not url:
- url = self.url_base
- elif not url.startswith("http"):
- url = self.url_base + url
-
- var_start = url.find("{") + 1
- while var_start:
- var_end = url.find("}", var_start)
- if var_end == -1:
- break
- var_name = url[var_start:var_end]
- if var_name in self.test_ids:
- url = url[:var_start-1] + self.test_ids[var_name] + url[var_end+1:]
- var_start += len(self.test_ids[var_name])
- var_start = url.find("{", var_start) + 1
- if payload:
- if isinstance(payload, str):
- if payload.startswith("@"):
- mode = "r"
- file_name = payload[1:]
- if payload.startswith("@b"):
- mode = "rb"
- file_name = payload[2:]
- with open(file_name, mode) as f:
- payload = f.read()
- elif isinstance(payload, dict):
- payload = json.dumps(payload)
-
- test = "Test {} {} {} {}".format(name, description, method, url)
- logger.warning(test)
- stream = False
- # if expected_payload == "zip":
- # stream = True
- r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream)
- logger.debug("RX {}: {}".format(r.status_code, r.text))
-
- # check response
- if expected_codes:
- if isinstance(expected_codes, int):
- expected_codes = (expected_codes,)
- if r.status_code not in expected_codes:
- raise TestException(
- "Got status {}. Expected {}. {}".format(r.status_code, expected_codes, r.text))
-
- if expected_headers:
- for header_key, header_val in expected_headers.items():
- if header_key.lower() not in r.headers:
- raise TestException("Header {} not present".format(header_key))
- if header_val and header_val.lower() not in r.headers[header_key]:
- raise TestException("Header {} does not contain {} but {}".format(header_key, header_val,
- r.headers[header_key]))
-
- if expected_payload is not None:
- if expected_payload == 0 and len(r.content) > 0:
- raise TestException("Expected empty payload")
- elif expected_payload == "json":
- try:
- r.json()
- except Exception as e:
- raise TestException("Expected json response payload, but got Exception {}".format(e))
- elif expected_payload == "yaml":
- try:
- yaml.safe_load(r.text)
- except Exception as e:
- raise TestException("Expected yaml response payload, but got Exception {}".format(e))
- elif expected_payload == "zip":
- if len(r.content) == 0:
- raise TestException("Expected some response payload, but got empty")
- # try:
- # tar = tarfile.open(None, 'r:gz', fileobj=r.raw)
- # for tarinfo in tar:
- # tarname = tarinfo.name
- # print(tarname)
- # except Exception as e:
- # raise TestException("Expected zip response payload, but got Exception {}".format(e))
- elif expected_payload == "text":
- if len(r.content) == 0:
- raise TestException("Expected some response payload, but got empty")
- # r.text
- location = r.headers.get("Location")
- if location:
- _id = location[location.rfind("/") + 1:]
- if _id:
- self.test_ids[name] = str(_id)
- self.test_ids[""] = str(_id) # last id
- return r
- except TestException as e:
- logger.error("{} \nRX code{}: {}".format(e, r.status_code, r.text))
- exit(1)
- except IOError as e:
- logger.error("Cannot open file {}".format(e))
- exit(1)
-
-
-if __name__ == "__main__":
- global logger
- test = ""
-
- # Disable warnings from self-signed certificates.
- requests.packages.urllib3.disable_warnings()
- try:
- logging.basicConfig(format="%(levelname)s %(message)s", level=logging.ERROR)
- logger = logging.getLogger('NBI')
- # load parameters and configuration
- opts, args = getopt.getopt(sys.argv[1:], "hvu:p:",
- ["url=", "user=", "password=", "help", "version", "verbose", "project=", "insecure"])
- url = "https://localhost:9999/osm"
- user = password = project = "admin"
- verbose = 0
- verify = True
-
- for o, a in opts:
- if o == "--version":
- print("test version " + __version__ + ' ' + version_date)
- sys.exit()
- elif o in ("-v", "--verbose"):
- verbose += 1
- elif o == "no-verbose":
- verbose = -1
- elif o in ("-h", "--help"):
- usage()
- sys.exit()
- elif o == "--url":
- url = a
- elif o in ("-u", "--user"):
- user = a
- elif o in ("-p", "--password"):
- password = a
- elif o == "--project":
- project = a
- elif o == "--insecure":
- verify = False
- else:
- assert False, "Unhandled option"
- if verbose == 0:
- logger.setLevel(logging.WARNING)
- elif verbose > 1:
- logger.setLevel(logging.DEBUG)
- else:
- logger.setLevel(logging.ERROR)
-
- test_rest = TestRest(url)
-
- # tests without authorization
- for t in test_not_authorized_list:
- test_rest.test(*t)
-
- # get token
- r = test_rest.test("token1", "Obtain token", "POST", "/admin/v1/tokens", headers_json,
- {"username": user, "password": password, "project_id": project},
- (200, 201), {"Content-Type": "application/json"}, "json")
- response = r.json()
- token = response["id"]
- test_rest.set_header({"Authorization": "Bearer {}".format(token)})
-
- # tests once authorized
- for t in test_authorized_list:
- test_rest.test(*t)
-
- # tests admin
- for t in test_admin_list1:
- test_rest.test(*t)
-
- # vnfd CREATE
- r = test_rest.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None,
- 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
- location = r.headers["Location"]
- vnfd_id = location[location.rfind("/")+1:]
- # print(location, vnfd_id)
-
- # vnfd UPLOAD test
- r = test_rest.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT",
- "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
- r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
-
- # vnfd SHOW OSM format
- r = test_rest.test("VNFD3", "Show VNFD OSM format", "GET",
- "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
- headers_json, None, 200, r_header_json, "json")
-
- # vnfd SHOW text
- r = test_rest.test("VNFD4", "Show VNFD SOL005 text", "GET",
- "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
- headers_text, None, 200, r_header_text, "text")
-
- # vnfd UPLOAD ZIP
- makedirs("temp", exist_ok=True)
- tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz")
- tar.add("cirros_vnf")
- tar.close()
- r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
- "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
- r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
-
- # vnfd SHOW OSM format
- r = test_rest.test("VNFD6", "Show VNFD OSM format", "GET",
- "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
- headers_json, None, 200, r_header_json, "json")
-
- # vnfd SHOW zip
- r = test_rest.test("VNFD7", "Show VNFD SOL005 zip", "GET",
- "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
- headers_zip, None, 200, r_header_zip, "zip")
- # vnfd SHOW descriptor
- r = test_rest.test("VNFD8", "Show VNFD descriptor", "GET",
- "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id),
- headers_text, None, 200, r_header_text, "text")
- # vnfd SHOW actifact
- r = test_rest.test("VNFD9", "Show VNFD artifact", "GET",
- "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id),
- headers_text, None, 200, r_header_octect, "text")
-
- # # vnfd DELETE
- # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE",
- # "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
- # headers_yaml, None, 204, None, 0)
-
- # nsd CREATE
- r = test_rest.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None,
- 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
- location = r.headers["Location"]
- nsd_id = location[location.rfind("/")+1:]
- # print(location, nsd_id)
-
- # nsd UPLOAD test
- r = test_rest.test("NSD2", "Onboard NSD with missing vnfd", "PUT",
- "/nsd/v1/ns_descriptors/{}/nsd_content?constituent-vnfd.0.vnfd-id-ref"
- "=NONEXISTING-VNFD".format(nsd_id),
- r_header_text, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml, "yaml")
-
- # # VNF_CREATE
- # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
- # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
- # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
-
- r = test_rest.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT",
- "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
- r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
-
- # nsd SHOW OSM format
- r = test_rest.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
- headers_json, None, 200, r_header_json, "json")
-
- # nsd SHOW text
- r = test_rest.test("NSD4", "Show NSD SOL005 text", "GET",
- "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
- headers_text, None, 200, r_header_text, "text")
-
- # nsd UPLOAD ZIP
- makedirs("temp", exist_ok=True)
- tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz")
- tar.add("cirros_ns")
- tar.close()
- r = test_rest.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT",
- "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
- r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
-
- # nsd SHOW OSM format
- r = test_rest.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
- headers_json, None, 200, r_header_json, "json")
-
- # nsd SHOW zip
- r = test_rest.test("NSD7", "Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
- headers_zip, None, 200, r_header_zip, "zip")
-
- # nsd SHOW descriptor
- r = test_rest.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id),
- headers_text, None, 200, r_header_text, "text")
- # nsd SHOW actifact
- r = test_rest.test("NSD9", "Show NSD artifact", "GET",
- "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id),
- headers_text, None, 200, r_header_octect, "text")
-
- # vnfd DELETE
- r = test_rest.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
- headers_yaml, None, 409, r_header_yaml, "yaml")
-
- # nsd DELETE
- r = test_rest.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id),
- headers_yaml, None, 204, None, 0)
-
- # vnfd DELETE
- r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
- headers_yaml, None, 204, None, 0)
-
- print("PASS")
-
- except Exception as e:
- if test:
- logger.error(test + " Exception: " + str(e))
- exit(1)
- else:
- logger.critical(test + " Exception: " + str(e), exc_info=True)
+++ /dev/null
-#! /usr/bin/python3
-# -*- coding: utf-8 -*-
-
-import getopt
-import sys
-import requests
-from os.path import getsize, basename
-from hashlib import md5
-
-__author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
-__date__ = "$2018-01-01$"
-__version__ = "0.1"
-version_date = "Jan 2018"
-
-
-def usage():
- print("Usage: ", sys.argv[0], "[options]")
- print(" --version: prints current version")
- print(" -f|--file FILE: file to be sent")
- print(" -h|--help: shows this help")
- print(" -u|--url URL: complete server URL")
- print(" -s|--chunk-size SIZE: size of chunks, by default 1000")
- print(" -t|--token TOKEN: Authorizaton token, previously obtained from server")
- print(" -v|--verbose print debug information, can be used several times")
- return
-
-
-if __name__ == "__main__":
- try:
- # load parameters and configuration
- opts, args = getopt.getopt(sys.argv[1:], "hvu:s:f:t:",
- ["url=", "help", "version", "verbose", "file=", "chunk-size=", "token="])
- url = None
- chunk_size = 500
- pkg_file = None
- verbose = 0
- token = None
-
- for o, a in opts:
- if o == "--version":
- print("upload version " + __version__ + ' ' + version_date)
- sys.exit()
- elif o in ("-v", "--verbose"):
- verbose += 1
- elif o in ("-h", "--help"):
- usage()
- sys.exit()
- elif o in ("-u", "--url"):
- url = a
- elif o in ("-s", "--chunk-size"):
- chunk_size = int(a)
- elif o in ("-f", "--file"):
- pkg_file = a
- elif o in ("-t", "--token"):
- token = a
- else:
- assert False, "Unhandled option"
- total_size = getsize(pkg_file)
- index = 0
- transaction_id = None
- file_md5 = md5()
- with open(pkg_file, 'rb') as f:
- headers = {
- "Content-type": "application/gzip",
- "Content-Filename": basename(pkg_file),
- "Accept": "application/json",
- }
- if token:
- headers["Authorization"] = token
- while index < total_size:
- chunk_data = f.read(chunk_size)
- file_md5.update(chunk_data)
- # payload = {"file_name": pkg_file, "chunk_data": base64.b64encode(chunk_data).decode("utf-8"),
- # "chunk_size": chunk_size}
- if transaction_id:
- headers["Transaction-Id"] = transaction_id
- if index+len(chunk_data) == total_size:
- headers["Content-File-MD5"] = file_md5.hexdigest()
- # payload["id"] = transaction_id
- headers["Content-range"] = "bytes {}-{}/{}".format(index, index+len(chunk_data)-1, total_size)
- # refers to rfc2616: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html
- if verbose:
- print("TX chunk Headers: {}".format(headers))
- r = requests.post(url, data=chunk_data, headers=headers, verify=False)
- if r.status_code not in (200, 201):
- print("Got {}: {}".format(r.status_code, r.text))
- exit(1)
- if verbose > 1:
- print("RX {}: {}".format(r.status_code, r.text))
- response = r.json()
- if not transaction_id:
- transaction_id = response["id"]
- index += len(chunk_data)
- if verbose <= 1:
- print("RX {}: {}".format(r.status_code, r.text))
- if "id" in response:
- print("---\nid: {}".format(response["id"]))
- except Exception as e:
- raise
--- /dev/null
+nsd-catalog:
+ nsd:
+ - id: cirros_nsd
+ name: cirros_ns
+ short-name: cirros_ns
+ description: Generated by OSM pacakage generator
+ vendor: OSM
+ version: '1.0'
+
+ # Place the logo as png in icons directory and provide the name here
+ logo: osm_2x.png
+
+ # Specify the VNFDs that are part of this NSD
+ constituent-vnfd:
+ # The member-vnf-index needs to be unique, starting from 1
+ # vnfd-id-ref is the id of the VNFD
+ # Multiple constituent VNFDs can be specified
+ - member-vnf-index: 1
+ vnfd-id-ref: cirros_vnfd
+ scaling-group-descriptor:
+ - name: "scaling_cirros"
+ vnfd-member:
+ - count: 1
+ member-vnf-index-ref: 1
+ min-instance-count: 0
+ max-instance-count: 10
+ scaling-policy:
+ - scaling-type: "manual"
+ cooldown-time: 10
+ threshold-time: 10
+ name: manual_scale
+ vld:
+ # Networks for the VNFs
+ - id: cirros_nsd_vld1
+ name: cirros_nsd_vld1
+ type: ELAN
+ mgmt-network: 'true'
+ # vim-network-name: <update>
+ # provider-network:
+ # segmentation_id: <update>
+ vnfd-connection-point-ref:
+ # Specify the constituent VNFs
+ # member-vnf-index-ref - entry from constituent vnf
+ # vnfd-id-ref - VNFD id
+ # vnfd-connection-point-ref - connection point name in the VNFD
+ - member-vnf-index-ref: 1
+ vnfd-id-ref: cirros_vnfd
+ # NOTE: Validate the entry below
+ vnfd-connection-point-ref: eth0
--- /dev/null
+vnfd-catalog:
+ vnfd:
+ - id: cirros_vnfd
+ name: cirros_vnf
+ short-name: cirros_vnf
+ description: Simple VNF example with a cirros
+ vendor: OSM
+ version: '1.0'
+
+ # Place the logo as png in icons directory and provide the name here
+ logo: cirros-64.png
+
+ # Management interface
+ mgmt-interface:
+ cp: eth0
+
+ # Atleast one VDU need to be specified
+ vdu:
+ - id: cirros_vnfd-VM
+ name: cirros_vnfd-VM
+ description: cirros_vnfd-VM
+ count: 1
+
+ # Flavour of the VM to be instantiated for the VDU
+ # flavor below can fit into m1.micro
+ vm-flavor:
+ vcpu-count: 1
+ memory-mb: 256
+ storage-gb: 2
+
+ # Image/checksum or image including the full path
+ image: 'cirros034'
+ #checksum:
+
+ interface:
+ # Specify the external interfaces
+ # There can be multiple interfaces defined
+ - name: eth0
+ type: EXTERNAL
+ virtual-interface:
+ type: VIRTIO
+ bandwidth: '0'
+ vpci: 0000:00:0a.0
+ external-connection-point-ref: eth0
+
+ connection-point:
+ - name: eth0
+ type: VPORT
--- /dev/null
+#! /bin/bash
+
+export NBI_URL=https://localhost:9999/osm
+USERNAME=admin
+PASSWORD=admin
+PROJECT=admin
+VIM=ost2-mrt-tid #OST2_MRT #ost2-mrt-tid
+
+DESCRIPTORS=/home/ubuntu/descriptors #../local/descriptors
+DESCRIPTORS=../local/descriptors
+
+VNFD1=${DESCRIPTORS}/ping_vnf.tar.gz
+VNFD2=${DESCRIPTORS}/pong_vnf.tar.gz
+VNFD3=${DESCRIPTORS}/cirros_vnfd.yaml
+
+NSD1=${DESCRIPTORS}/ping_pong_ns.tar.gz
+NSD2=${DESCRIPTORS}/cirros_2vnf_ns.tar.gz
+NSD3=${DESCRIPTORS}/cirros_nsd.yaml
+
+[ -f "$VNFD1" ] || ! echo "not found ping_vnf.tar.gz. Set DESCRIPTORS variable to a proper location" || exit 1
+[ -f "$VNFD2" ] || ! echo "not found pong_vnf.tar.gz. Set DESCRIPTORS variable to a proper location" || exit 1
+[ -f "$VNFD3" ] || ! echo "not found cirros_vnfd.yaml. Set DESCRIPTORS variable to a proper location" || exit 1
+[ -f "$NSD1" ] || ! echo "not found ping_pong_ns.tar.gz. Set DESCRIPTORS variable to a proper location" || exit 1
+[ -f "$NSD2" ] || ! echo "not found cirros_2vnf_ns.tar.gz. Set DESCRIPTORS variable to a proper location" || exit 1
+[ -f "$NSD3" ] || ! echo "not found cirros_nsd.yaml. Set DESCRIPTORS variable to a proper location" || exit 1
+
+#get token
+TOKEN=`curl --insecure -H "Content-Type: application/yaml" -H "Accept: application/yaml" \
+ --data "{username: $USERNAME, password: $PASSWORD, project_id: $PROJECT}" ${NBI_URL}/admin/v1/tokens \
+ 2>/dev/null | awk '($1=="id:"){print $2}'`;
+echo token: $TOKEN
+
+
+# VNFD
+#########
+#insert PKG
+VNFD1_ID=`curl --insecure -w "%{http_code}\n" -H "Content-Type: application/gzip" -H "Accept: application/yaml" \
+ -H "Authorization: Bearer $TOKEN" --data-binary "@$VNFD1" ${NBI_URL}/vnfpkgm/v1/vnf_packages_content \
+ 2>/dev/null | awk '($1=="id:"){print $2}'`
+echo ping_vnfd: $VNFD1_ID
+
+VNFD2_ID=`curl --insecure -w "%{http_code}\n" -H "Content-Type: application/gzip" -H "Accept: application/yaml" \
+ -H "Authorization: Bearer $TOKEN" --data-binary "@$VNFD2" ${NBI_URL}/vnfpkgm/v1/vnf_packages_content \
+ 2>/dev/null | awk '($1=="id:"){print $2}'`
+echo pong_vnfd: $VNFD2_ID
+
+
+
+# NSD
+#########
+#insert PKG
+NSD1_ID=`curl --insecure -w "%{http_code}\n" -H "Content-Type: application/gzip" -H "Accept: application/yaml" \
+ -H "Authorization: Bearer $TOKEN" --data-binary "@$NSD1" ${NBI_URL}/nsd/v1/ns_descriptors_content \
+ 2>/dev/null | awk '($1=="id:"){print $2}'`
+echo ping_pong_nsd: $NSD1_ID
+
+
+# NSRS
+##############
+#add nsr
+NSR1_ID=`curl --insecure -w "%{http_code}\n" -H "Content-Type: application/yaml" -H "Accept: application/yaml" \
+ -H "Authorization: Bearer $TOKEN" --data "{ nsDescription: default description, nsName: NSNAME, nsdId: $NSD1_ID, \
+ ssh-authorized-key: [ {key-pair-ref: gerardo}, {key-pair-ref: alfonso}], vimAccountId: $VIM }" \
+ ${NBI_URL}/nslcm/v1/ns_instances_content 2>/dev/null | awk '($1=="id:"){print $2}'` ;
+echo ping_pong_nsr: $NSR1_ID
+
+
+echo 'curl --insecure -w "%{http_code}\n" -H "Content-Type: application/yaml" -H "Accept: application/yaml"' \
+ '-H "Authorization: Bearer '$TOKEN'" '${NBI_URL}'/nslcm/v1/ns_instances_content/'$NSR1_ID' 2>/dev/null | ' \
+ 'grep -e detailed-status -e operational-status -e config-status'
+
+
+
+
--- /dev/null
+#! /bin/bash
+
+export NBI_URL=https://localhost:9999/osm
+USERNAME=admin
+PASSWORD=admin
+PROJECT=admin
+
+
+
+#get token
+TOKEN=`curl --insecure -H "Content-Type: application/yaml" -H "Accept: application/yaml" --data "{username: $USERNAME, password: $PASSWORD, project_id: $PROJECT}" ${NBI_URL}/admin/v1/tokens 2>/dev/null | awk '($1=="id:"){print $2}' ` ; echo $TOKEN
+
+
+echo deleting all
+#DELETE ALL
+
+for url_item in nslcm/v1/ns_instances nsd/v1/ns_descriptors_content vnfpkgm/v1/vnf_packages_content
+do
+ for ITEM_ID in `curl --insecure -w "%{http_code}\n" -H "Content-Type: application/yaml" -H "Accept: application/yaml" -H "Authorization: Bearer $TOKEN" ${NBI_URL}/${url_item} 2>/dev/null | awk '($1=="_id:") {print $2}'` ;
+ do
+ curl --insecure -w "%{http_code}\n" -H "Content-Type: application/yaml" -H "Accept: application/yaml" -H "Authorization: Bearer $TOKEN" ${NBI_URL}/${url_item}/$ITEM_ID -X DELETE
+ done
+done
+
+# curl --insecure ${NBI_URL}/test/prune
+
--- /dev/null
+#! /usr/bin/python3
+# -*- coding: utf-8 -*-
+
+import getopt
+import sys
+import requests
+import json
+import logging
+import yaml
+# import json
+import tarfile
+from os import makedirs
+
+__author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
+__date__ = "$2018-03-01$"
+__version__ = "0.1"
+version_date = "Mar 2018"
+
+
+def usage():
+ print("Usage: ", sys.argv[0], "[options]")
+ print(" --version: prints current version")
+ print(" -f|--file FILE: file to be sent")
+ print(" -h|--help: shows this help")
+ print(" -u|--url URL: complete server URL")
+ print(" -s|--chunk-size SIZE: size of chunks, by default 1000")
+ print(" -t|--token TOKEN: Authorizaton token, previously obtained from server")
+ print(" -v|--verbose print debug information, can be used several times")
+ return
+
+
+r_header_json = {"Content-type": "application/json"}
+headers_json = {
+ "Content-type": "application/json",
+ "Accept": "application/json",
+}
+r_header_yaml = {"Content-type": "application/yaml"}
+headers_yaml = {
+ "Content-type": "application/yaml",
+ "Accept": "application/yaml",
+}
+r_header_text = {"Content-type": "text/plain"}
+r_header_octect = {"Content-type": "application/octet-stream"}
+headers_text = {
+ "Accept": "text/plain",
+}
+r_header_zip = {"Content-type": "application/zip"}
+headers_zip = {
+ "Accept": "application/zip",
+}
+# test without authorization
+test_not_authorized_list = (
+ ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"),
+ ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"),
+ ("NA3", "Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"),
+)
+
+# test ones authorized
+test_authorized_list = (
+ ("AU1", "Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id",
+ headers_json, None, 404, r_header_json, "json"),
+ ("AU2", "Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id",
+ headers_yaml, None, 404, r_header_yaml, "yaml"),
+ ("AU3", "Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id",
+ headers_yaml, None, 404, r_header_yaml, "yaml"),
+)
+
+vim = {
+ "schema_version": "1.0",
+ "schema_type": "No idea",
+ "name": "myVim",
+ "description": "Descriptor name",
+ "vim_type": "openstack",
+ "vim_url": "http://localhost:/vim",
+ "vim_tenant_name": "vimTenant",
+ "vim_user": "user",
+ "vim_password": "password",
+ "config": {"config_param": 1}
+}
+
+vim_bad = vim.copy()
+vim_bad.pop("name")
+
+test_admin_list1 = (
+ ("VIM1", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, vim, (201, 204),
+ {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"),
+ ("VIM2", "Create VIM bad schema", "POST", "/admin/v1/vim_accounts", headers_json, vim_bad, 422, None, headers_json),
+ ("VIM2", "Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json, vim, 409, None, headers_json),
+ ("VIM4", "Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml, None, 200, r_header_yaml, "yaml"),
+ ("VIM5", "Show VIM", "GET", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 200, r_header_yaml, "yaml"),
+ ("VIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 202, None, 0),
+)
+
+
+class TestException(Exception):
+ pass
+
+
+class TestRest:
+ def __init__(self, url_base, header_base={}, verify=False):
+ self.url_base = url_base
+ self.header_base = header_base
+ self.s = requests.session()
+ self.s.headers = header_base
+ self.verify = verify
+ # contains ID of tests obtained from Location response header. "" key contains last obtained id
+ self.test_ids = {}
+
+ def set_header(self, header):
+ self.s.headers.update(header)
+
+ def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers,
+ expected_payload):
+ """
+ Performs an http request and check http code response. Exit if different than allowed. It get the returned id
+ that can be used by following test in the URL with {name} where name is the name of the test
+ :param name: short name of the test
+ :param description: description of the test
+ :param method: HTTP method: GET,PUT,POST,DELETE,...
+ :param url: complete URL or relative URL
+ :param headers: request headers to add to the base headers
+ :param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
+ :param expected_codes: expected response codes, can be int, int tuple or int range
+ :param expected_headers: expected response headers, dict with key values
+ :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
+ :return: requests response
+ """
+ try:
+ if not self.s:
+ self.s = requests.session()
+ # URL
+ if not url:
+ url = self.url_base
+ elif not url.startswith("http"):
+ url = self.url_base + url
+
+ var_start = url.find("{") + 1
+ while var_start:
+ var_end = url.find("}", var_start)
+ if var_end == -1:
+ break
+ var_name = url[var_start:var_end]
+ if var_name in self.test_ids:
+ url = url[:var_start-1] + self.test_ids[var_name] + url[var_end+1:]
+ var_start += len(self.test_ids[var_name])
+ var_start = url.find("{", var_start) + 1
+ if payload:
+ if isinstance(payload, str):
+ if payload.startswith("@"):
+ mode = "r"
+ file_name = payload[1:]
+ if payload.startswith("@b"):
+ mode = "rb"
+ file_name = payload[2:]
+ with open(file_name, mode) as f:
+ payload = f.read()
+ elif isinstance(payload, dict):
+ payload = json.dumps(payload)
+
+ test = "Test {} {} {} {}".format(name, description, method, url)
+ logger.warning(test)
+ stream = False
+ # if expected_payload == "zip":
+ # stream = True
+ r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream)
+ logger.debug("RX {}: {}".format(r.status_code, r.text))
+
+ # check response
+ if expected_codes:
+ if isinstance(expected_codes, int):
+ expected_codes = (expected_codes,)
+ if r.status_code not in expected_codes:
+ raise TestException(
+ "Got status {}. Expected {}. {}".format(r.status_code, expected_codes, r.text))
+
+ if expected_headers:
+ for header_key, header_val in expected_headers.items():
+ if header_key.lower() not in r.headers:
+ raise TestException("Header {} not present".format(header_key))
+ if header_val and header_val.lower() not in r.headers[header_key]:
+ raise TestException("Header {} does not contain {} but {}".format(header_key, header_val,
+ r.headers[header_key]))
+
+ if expected_payload is not None:
+ if expected_payload == 0 and len(r.content) > 0:
+ raise TestException("Expected empty payload")
+ elif expected_payload == "json":
+ try:
+ r.json()
+ except Exception as e:
+ raise TestException("Expected json response payload, but got Exception {}".format(e))
+ elif expected_payload == "yaml":
+ try:
+ yaml.safe_load(r.text)
+ except Exception as e:
+ raise TestException("Expected yaml response payload, but got Exception {}".format(e))
+ elif expected_payload == "zip":
+ if len(r.content) == 0:
+ raise TestException("Expected some response payload, but got empty")
+ # try:
+ # tar = tarfile.open(None, 'r:gz', fileobj=r.raw)
+ # for tarinfo in tar:
+ # tarname = tarinfo.name
+ # print(tarname)
+ # except Exception as e:
+ # raise TestException("Expected zip response payload, but got Exception {}".format(e))
+ elif expected_payload == "text":
+ if len(r.content) == 0:
+ raise TestException("Expected some response payload, but got empty")
+ # r.text
+ location = r.headers.get("Location")
+ if location:
+ _id = location[location.rfind("/") + 1:]
+ if _id:
+ self.test_ids[name] = str(_id)
+ self.test_ids[""] = str(_id) # last id
+ return r
+ except TestException as e:
+ logger.error("{} \nRX code{}: {}".format(e, r.status_code, r.text))
+ exit(1)
+ except IOError as e:
+ logger.error("Cannot open file {}".format(e))
+ exit(1)
+
+
+if __name__ == "__main__":
+ global logger
+ test = ""
+
+ # Disable warnings from self-signed certificates.
+ requests.packages.urllib3.disable_warnings()
+ try:
+ logging.basicConfig(format="%(levelname)s %(message)s", level=logging.ERROR)
+ logger = logging.getLogger('NBI')
+ # load parameters and configuration
+ opts, args = getopt.getopt(sys.argv[1:], "hvu:p:",
+ ["url=", "user=", "password=", "help", "version", "verbose", "project=", "insecure"])
+ url = "https://localhost:9999/osm"
+ user = password = project = "admin"
+ verbose = 0
+ verify = True
+
+ for o, a in opts:
+ if o == "--version":
+ print("test version " + __version__ + ' ' + version_date)
+ sys.exit()
+ elif o in ("-v", "--verbose"):
+ verbose += 1
+ elif o == "no-verbose":
+ verbose = -1
+ elif o in ("-h", "--help"):
+ usage()
+ sys.exit()
+ elif o == "--url":
+ url = a
+ elif o in ("-u", "--user"):
+ user = a
+ elif o in ("-p", "--password"):
+ password = a
+ elif o == "--project":
+ project = a
+ elif o == "--insecure":
+ verify = False
+ else:
+ assert False, "Unhandled option"
+ if verbose == 0:
+ logger.setLevel(logging.WARNING)
+ elif verbose > 1:
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.setLevel(logging.ERROR)
+
+ test_rest = TestRest(url)
+
+ # tests without authorization
+ for t in test_not_authorized_list:
+ test_rest.test(*t)
+
+ # get token
+ r = test_rest.test("token1", "Obtain token", "POST", "/admin/v1/tokens", headers_json,
+ {"username": user, "password": password, "project_id": project},
+ (200, 201), {"Content-Type": "application/json"}, "json")
+ response = r.json()
+ token = response["id"]
+ test_rest.set_header({"Authorization": "Bearer {}".format(token)})
+
+ # tests once authorized
+ for t in test_authorized_list:
+ test_rest.test(*t)
+
+ # tests admin
+ for t in test_admin_list1:
+ test_rest.test(*t)
+
+ # vnfd CREATE
+ r = test_rest.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None,
+ 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
+ location = r.headers["Location"]
+ vnfd_id = location[location.rfind("/")+1:]
+ # print(location, vnfd_id)
+
+ # vnfd UPLOAD test
+ r = test_rest.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT",
+ "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
+ r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
+
+ # vnfd SHOW OSM format
+ r = test_rest.test("VNFD3", "Show VNFD OSM format", "GET",
+ "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
+ headers_json, None, 200, r_header_json, "json")
+
+ # vnfd SHOW text
+ r = test_rest.test("VNFD4", "Show VNFD SOL005 text", "GET",
+ "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
+ headers_text, None, 200, r_header_text, "text")
+
+ # vnfd UPLOAD ZIP
+ makedirs("temp", exist_ok=True)
+ tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz")
+ tar.add("cirros_vnf")
+ tar.close()
+ r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
+ "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
+ r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
+
+ # vnfd SHOW OSM format
+ r = test_rest.test("VNFD6", "Show VNFD OSM format", "GET",
+ "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
+ headers_json, None, 200, r_header_json, "json")
+
+ # vnfd SHOW zip
+ r = test_rest.test("VNFD7", "Show VNFD SOL005 zip", "GET",
+ "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
+ headers_zip, None, 200, r_header_zip, "zip")
+ # vnfd SHOW descriptor
+ r = test_rest.test("VNFD8", "Show VNFD descriptor", "GET",
+ "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id),
+ headers_text, None, 200, r_header_text, "text")
+ # vnfd SHOW actifact
+ r = test_rest.test("VNFD9", "Show VNFD artifact", "GET",
+ "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id),
+ headers_text, None, 200, r_header_octect, "text")
+
+ # # vnfd DELETE
+ # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE",
+ # "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
+ # headers_yaml, None, 204, None, 0)
+
+ # nsd CREATE
+ r = test_rest.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None,
+ 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
+ location = r.headers["Location"]
+ nsd_id = location[location.rfind("/")+1:]
+ # print(location, nsd_id)
+
+ # nsd UPLOAD test
+ r = test_rest.test("NSD2", "Onboard NSD with missing vnfd", "PUT",
+ "/nsd/v1/ns_descriptors/{}/nsd_content?constituent-vnfd.0.vnfd-id-ref"
+ "=NONEXISTING-VNFD".format(nsd_id),
+ r_header_text, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml, "yaml")
+
+ # # VNF_CREATE
+ # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
+ # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
+ # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
+
+ r = test_rest.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT",
+ "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
+ r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
+
+ # nsd SHOW OSM format
+ r = test_rest.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
+ headers_json, None, 200, r_header_json, "json")
+
+ # nsd SHOW text
+ r = test_rest.test("NSD4", "Show NSD SOL005 text", "GET",
+ "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
+ headers_text, None, 200, r_header_text, "text")
+
+ # nsd UPLOAD ZIP
+ makedirs("temp", exist_ok=True)
+ tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz")
+ tar.add("cirros_ns")
+ tar.close()
+ r = test_rest.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT",
+ "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
+ r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
+
+ # nsd SHOW OSM format
+ r = test_rest.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
+ headers_json, None, 200, r_header_json, "json")
+
+ # nsd SHOW zip
+ r = test_rest.test("NSD7", "Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
+ headers_zip, None, 200, r_header_zip, "zip")
+
+ # nsd SHOW descriptor
+ r = test_rest.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id),
+ headers_text, None, 200, r_header_text, "text")
+ # nsd SHOW actifact
+ r = test_rest.test("NSD9", "Show NSD artifact", "GET",
+ "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id),
+ headers_text, None, 200, r_header_octect, "text")
+
+ # vnfd DELETE
+ r = test_rest.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
+ headers_yaml, None, 409, r_header_yaml, "yaml")
+
+ # nsd DELETE
+ r = test_rest.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id),
+ headers_yaml, None, 204, None, 0)
+
+ # vnfd DELETE
+ r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
+ headers_yaml, None, 204, None, 0)
+
+ print("PASS")
+
+ except Exception as e:
+ if test:
+ logger.error(test + " Exception: " + str(e))
+ exit(1)
+ else:
+ logger.critical(test + " Exception: " + str(e), exc_info=True)
--- /dev/null
+#! /usr/bin/python3
+# -*- coding: utf-8 -*-
+
+import getopt
+import sys
+import requests
+from os.path import getsize, basename
+from hashlib import md5
+
+__author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
+__date__ = "$2018-01-01$"
+__version__ = "0.1"
+version_date = "Jan 2018"
+
+
+def usage():
+ print("Usage: ", sys.argv[0], "[options]")
+ print(" --version: prints current version")
+ print(" -f|--file FILE: file to be sent")
+ print(" -h|--help: shows this help")
+ print(" -u|--url URL: complete server URL")
+ print(" -s|--chunk-size SIZE: size of chunks, by default 1000")
+ print(" -t|--token TOKEN: Authorizaton token, previously obtained from server")
+ print(" -v|--verbose print debug information, can be used several times")
+ return
+
+
+if __name__ == "__main__":
+ try:
+ # load parameters and configuration
+ opts, args = getopt.getopt(sys.argv[1:], "hvu:s:f:t:",
+ ["url=", "help", "version", "verbose", "file=", "chunk-size=", "token="])
+ url = None
+ chunk_size = 500
+ pkg_file = None
+ verbose = 0
+ token = None
+
+ for o, a in opts:
+ if o == "--version":
+ print("upload version " + __version__ + ' ' + version_date)
+ sys.exit()
+ elif o in ("-v", "--verbose"):
+ verbose += 1
+ elif o in ("-h", "--help"):
+ usage()
+ sys.exit()
+ elif o in ("-u", "--url"):
+ url = a
+ elif o in ("-s", "--chunk-size"):
+ chunk_size = int(a)
+ elif o in ("-f", "--file"):
+ pkg_file = a
+ elif o in ("-t", "--token"):
+ token = a
+ else:
+ assert False, "Unhandled option"
+ total_size = getsize(pkg_file)
+ index = 0
+ transaction_id = None
+ file_md5 = md5()
+ with open(pkg_file, 'rb') as f:
+ headers = {
+ "Content-type": "application/gzip",
+ "Content-Filename": basename(pkg_file),
+ "Accept": "application/json",
+ }
+ if token:
+ headers["Authorization"] = token
+ while index < total_size:
+ chunk_data = f.read(chunk_size)
+ file_md5.update(chunk_data)
+ # payload = {"file_name": pkg_file, "chunk_data": base64.b64encode(chunk_data).decode("utf-8"),
+ # "chunk_size": chunk_size}
+ if transaction_id:
+ headers["Transaction-Id"] = transaction_id
+ if index+len(chunk_data) == total_size:
+ headers["Content-File-MD5"] = file_md5.hexdigest()
+ # payload["id"] = transaction_id
+ headers["Content-range"] = "bytes {}-{}/{}".format(index, index+len(chunk_data)-1, total_size)
+ # refers to rfc2616: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html
+ if verbose:
+ print("TX chunk Headers: {}".format(headers))
+ r = requests.post(url, data=chunk_data, headers=headers, verify=False)
+ if r.status_code not in (200, 201):
+ print("Got {}: {}".format(r.status_code, r.text))
+ exit(1)
+ if verbose > 1:
+ print("RX {}: {}".format(r.status_code, r.text))
+ response = r.json()
+ if not transaction_id:
+ transaction_id = response["id"]
+ index += len(chunk_data)
+ if verbose <= 1:
+ print("RX {}: {}".format(r.status_code, r.text))
+ if "id" in response:
+ print("---\nid: {}".format(response["id"]))
+ except Exception as e:
+ raise
basepython = python3
deps = flake8
commands =
- flake8 setup.py --max-line-length 120 --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp --ignore W291,W293
+ flake8 osm_nbi --max-line-length 120 --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp,vnfd_catalog.py,nsd_catalog.py --ignore W291,W293,E226
[testenv:build]
basepython = python3