2 # -*- coding: utf-8 -*-
8 #from os.path import getsize, basename
9 #from hashlib import md5
15 from os
import makedirs
16 from copy
import deepcopy
18 __author__
= "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
19 __date__
= "$2018-03-01$"
21 version_date
= "Mar 2018"
25 print("Usage: ", sys
.argv
[0], "[options]")
26 print(" --version: prints current version")
27 print(" -f|--file FILE: file to be sent")
28 print(" -h|--help: shows this help")
29 print(" -u|--url URL: complete server URL")
30 print(" -s|--chunk-size SIZE: size of chunks, by default 1000")
31 print(" -t|--token TOKEN: Authorizaton token, previously obtained from server")
32 print(" -v|--verbose print debug information, can be used several times")
36 r_header_json
= {"Content-type": "application/json"}
38 "Content-type": "application/json",
39 "Accept": "application/json",
41 r_header_yaml
= {"Content-type": "application/yaml"}
43 "Content-type": "application/yaml",
44 "Accept": "application/yaml",
46 r_header_text
= {"Content-type": "text/plain"}
47 r_header_octect
= {"Content-type": "application/octet-stream"}
49 "Accept": "text/plain",
51 r_header_zip
= {"Content-type": "application/zip"}
53 "Accept": "application/zip",
55 # test without authorization
56 test_not_authorized_list
= (
57 ("Invalid token", "GET", "/admin/v1/users", headers_json
, None, 401, r_header_json
, "json"),
58 ("Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml
, None, 405, r_header_yaml
, "yaml"),
59 ("Invalid version", "DELETE", "/admin/v2/users", headers_yaml
, None, 405, r_header_yaml
, "yaml"),
62 # test ones authorized
63 test_authorized_list
= (
64 ("Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id", headers_json
, None, 404, r_header_json
, "json"),
65 ("Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id", headers_yaml
, None, 404, r_header_yaml
, "yaml"),
66 ("Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id", headers_yaml
, None, 404, r_header_yaml
, "yaml"),
69 class TestException(Exception):
74 def __init__(self
, url_base
, header_base
={}, verify
=False):
75 self
.url_base
= url_base
76 self
.header_base
= header_base
77 self
.s
= requests
.session()
78 self
.s
.headers
= header_base
81 def set_header(self
, header
):
82 self
.s
.headers
.update(header
)
84 def test(self
, name
, method
, url
, headers
, payload
, expected_codes
, expected_headers
, expected_payload
):
86 Performs an http request and check http code response. Exit if different than allowed
87 :param name: name of the test
88 :param method: HTTP method: GET,PUT,POST,DELETE,...
89 :param url: complete URL or relative URL
90 :param headers: request headers to add to the base headers
91 :param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
92 :param expected_codes: expected response codes, can be int, int tuple or int range
93 :param expected_headers: expected response headers, dict with key values
94 :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
99 self
.s
= requests
.session()
102 elif not url
.startswith("http"):
103 url
= self
.url_base
+ url
105 if isinstance(payload
, str):
106 if payload
.startswith("@"):
108 file_name
= payload
[1:]
109 if payload
.startswith("@b"):
111 file_name
= payload
[2:]
112 with
open(file_name
, mode
) as f
:
114 elif isinstance(payload
, dict):
115 payload
= json
.dumps(payload
)
117 test
= "Test {} {} {}".format(name
, method
, url
)
120 # if expected_payload == "zip":
122 r
= getattr(self
.s
, method
.lower())(url
, data
=payload
, headers
=headers
, verify
=self
.verify
, stream
=stream
)
123 logger
.debug("RX {}: {}".format(r
.status_code
, r
.text
))
127 if isinstance(expected_codes
, int):
128 expected_codes
= (expected_codes
,)
129 if r
.status_code
not in expected_codes
:
131 "Got status {}. Expected {}. {}".format(r
.status_code
, expected_codes
, r
.text
))
134 for header_key
, header_val
in expected_headers
.items():
135 if header_key
.lower() not in r
.headers
:
136 raise TestException("Header {} not present".format(header_key
))
137 if header_val
and header_val
.lower() not in r
.headers
[header_key
]:
138 raise TestException("Header {} does not contain {} but {}".format(header_key
, header_val
,
139 r
.headers
[header_key
]))
141 if expected_payload
is not None:
142 if expected_payload
== 0 and len(r
.content
) > 0:
143 raise TestException("Expected empty payload")
144 elif expected_payload
== "json":
147 except Exception as e
:
148 raise TestException("Expected json response payload, but got Exception {}".format(e
))
149 elif expected_payload
== "yaml":
151 yaml
.safe_load(r
.text
)
152 except Exception as e
:
153 raise TestException("Expected yaml response payload, but got Exception {}".format(e
))
154 elif expected_payload
== "zip":
155 if len(r
.content
) == 0:
156 raise TestException("Expected some response payload, but got empty")
158 # tar = tarfile.open(None, 'r:gz', fileobj=r.raw)
159 # for tarinfo in tar:
160 # tarname = tarinfo.name
162 # except Exception as e:
163 # raise TestException("Expected zip response payload, but got Exception {}".format(e))
164 elif expected_payload
== "text":
165 if len(r
.content
) == 0:
166 raise TestException("Expected some response payload, but got empty")
169 except TestException
as e
:
170 logger
.error("{} \nRX code{}: {}".format(e
, r
.status_code
, r
.text
))
173 logger
.error("Cannot open file {}".format(e
))
177 if __name__
== "__main__":
181 logging
.basicConfig(format
="%(levelname)s %(message)s", level
=logging
.ERROR
)
182 logger
= logging
.getLogger('NBI')
183 # load parameters and configuration
184 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hvu:p:",
185 ["url=", "user=", "password=", "help", "version", "verbose", "project=", "insecure"])
186 url
= "https://localhost:9999/osm"
187 user
= password
= project
= "admin"
193 print ("test version " + __version__
+ ' ' + version_date
)
195 elif o
in ("-v", "--verbose"):
197 elif o
in ("no-verbose"):
199 elif o
in ("-h", "--help"):
204 elif o
in ("-u", "--user"):
206 elif o
in ("-p", "--password"):
208 elif o
in ("--project"):
210 elif o
in ("--insecure"):
213 assert False, "Unhandled option"
215 logger
.setLevel(logging
.WARNING
)
217 logger
.setLevel(logging
.DEBUG
)
219 logger
.setLevel(logging
.ERROR
)
221 test_rest
= TestRest(url
)
223 # tests without authorization
224 for t
in test_not_authorized_list
:
228 r
= test_rest
.test("Obtain token", "POST", "/admin/v1/tokens", headers_json
,
229 {"username": user
, "password": password
, "project_id": project
},
230 (200, 201), {"Content-Type": "application/json"}, "json")
232 token
= response
["id"]
233 test_rest
.set_header({"Authorization": "Bearer {}".format(token
)})
235 # tests once authorized
236 for t
in test_authorized_list
:
240 r
= test_rest
.test("Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json
, None,
241 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
242 location
= r
.headers
["Location"]
243 nsd_id
= location
[location
.rfind("/")+1:]
244 # print(location, nsd_id)
247 r
= test_rest
.test("Onboard NSD step 2 as TEXT", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
248 r_header_text
, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
250 # nsd SHOW OSM format
251 r
= test_rest
.test("Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id
),
252 headers_json
, None, 200, r_header_json
, "json")
255 r
= test_rest
.test("Show NSD SOL005 text", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
256 headers_text
, None, 200, r_header_text
, "text")
259 makedirs("temp", exist_ok
=True)
260 tar
= tarfile
.open("temp/cirros_ns.tar.gz", "w:gz")
263 r
= test_rest
.test("Onboard NSD step 3 replace with ZIP", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
264 r_header_zip
, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
266 # nsd SHOW OSM format
267 r
= test_rest
.test("Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id
),
268 headers_json
, None, 200, r_header_json
, "json")
271 r
= test_rest
.test("Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
272 headers_zip
, None, 200, r_header_zip
, "zip")
274 # nsd SHOW descriptor
275 r
= test_rest
.test("Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id
),
276 headers_text
, None, 200, r_header_text
, "text")
278 r
= test_rest
.test("Show NSD artifact", "GET", "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id
),
279 headers_text
, None, 200, r_header_octect
, "text")
282 r
= test_rest
.test("Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id
),
283 headers_yaml
, None, 204, None, 0)
286 r
= test_rest
.test("Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json
, None,
287 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
288 location
= r
.headers
["Location"]
289 vnfd_id
= location
[location
.rfind("/")+1:]
290 # print(location, vnfd_id)
293 r
= test_rest
.test("Onboard VNFD step 2 as TEXT", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
294 r_header_text
, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
296 # vnfd SHOW OSM format
297 r
= test_rest
.test("Show VNFD OSM format", "GET", "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id
),
298 headers_json
, None, 200, r_header_json
, "json")
301 r
= test_rest
.test("Show VNFD SOL005 text", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
302 headers_text
, None, 200, r_header_text
, "text")
305 makedirs("temp", exist_ok
=True)
306 tar
= tarfile
.open("temp/cirros_vnf.tar.gz", "w:gz")
307 tar
.add("cirros_vnf")
309 r
= test_rest
.test("Onboard VNFD step 3 replace with ZIP", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
310 r_header_zip
, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
312 # vnfd SHOW OSM format
313 r
= test_rest
.test("Show VNFD OSM format", "GET", "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id
),
314 headers_json
, None, 200, r_header_json
, "json")
317 r
= test_rest
.test("Show VNFD SOL005 zip", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
318 headers_zip
, None, 200, r_header_zip
, "zip")
319 # vnfd SHOW descriptor
320 r
= test_rest
.test("Show VNFD descriptor", "GET", "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id
),
321 headers_text
, None, 200, r_header_text
, "text")
323 r
= test_rest
.test("Show VNFD artifact", "GET", "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id
),
324 headers_text
, None, 200, r_header_octect
, "text")
327 r
= test_rest
.test("Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id
),
328 headers_yaml
, None, 204, None, 0)
332 except Exception as e
:
334 logger
.error(test
+ " Exception: " + str(e
))
337 logger
.critical(test
+ " Exception: " + str(e
), exc_info
=True)