2 # -*- coding: utf-8 -*-
12 from os
import makedirs
14 __author__
= "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
15 __date__
= "$2018-03-01$"
17 version_date
= "Mar 2018"
21 print("Usage: ", sys
.argv
[0], "[options]")
22 print(" --version: prints current version")
23 print(" -f|--file FILE: file to be sent")
24 print(" -h|--help: shows this help")
25 print(" -u|--url URL: complete server URL")
26 print(" -s|--chunk-size SIZE: size of chunks, by default 1000")
27 print(" -t|--token TOKEN: Authorizaton token, previously obtained from server")
28 print(" -v|--verbose print debug information, can be used several times")
32 r_header_json
= {"Content-type": "application/json"}
34 "Content-type": "application/json",
35 "Accept": "application/json",
37 r_header_yaml
= {"Content-type": "application/yaml"}
39 "Content-type": "application/yaml",
40 "Accept": "application/yaml",
42 r_header_text
= {"Content-type": "text/plain"}
43 r_header_octect
= {"Content-type": "application/octet-stream"}
45 "Accept": "text/plain",
47 r_header_zip
= {"Content-type": "application/zip"}
49 "Accept": "application/zip",
51 # test without authorization
52 test_not_authorized_list
= (
53 ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json
, None, 401, r_header_json
, "json"),
54 ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml
, None, 405, r_header_yaml
, "yaml"),
55 ("NA3", "Invalid version", "DELETE", "/admin/v2/users", headers_yaml
, None, 405, r_header_yaml
, "yaml"),
58 # test ones authorized
59 test_authorized_list
= (
60 ("AU1", "Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id",
61 headers_json
, None, 404, r_header_json
, "json"),
62 ("AU2", "Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id",
63 headers_yaml
, None, 404, r_header_yaml
, "yaml"),
64 ("AU3", "Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id",
65 headers_yaml
, None, 404, r_header_yaml
, "yaml"),
69 "schema_version": "1.0",
70 "schema_type": "No idea",
72 "description": "Descriptor name",
73 "vim_type": "openstack",
74 "vim_url": "http://localhost:/vim",
75 "vim_tenant_name": "vimTenant",
77 "vim_password": "password",
78 "config": {"config_param": 1}
85 ("VIM1", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json
, vim
, (201, 204),
86 {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"),
87 ("VIM2", "Create VIM bad schema", "POST", "/admin/v1/vim_accounts", headers_json
, vim_bad
, 422, None, headers_json
),
88 ("VIM2", "Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json
, vim
, 409, None, headers_json
),
89 ("VIM4", "Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml
, None, 200, r_header_yaml
, "yaml"),
90 ("VIM5", "Show VIM", "GET", "/admin/v1/vim_accounts/{VIM1}", headers_yaml
, None, 200, r_header_yaml
, "yaml"),
91 ("VIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/{VIM1}", headers_yaml
, None, 202, None, 0),
95 class TestException(Exception):
100 def __init__(self
, url_base
, header_base
={}, verify
=False):
101 self
.url_base
= url_base
102 self
.header_base
= header_base
103 self
.s
= requests
.session()
104 self
.s
.headers
= header_base
106 # contains ID of tests obtained from Location response header. "" key contains last obtained id
109 def set_header(self
, header
):
110 self
.s
.headers
.update(header
)
112 def test(self
, name
, description
, method
, url
, headers
, payload
, expected_codes
, expected_headers
,
115 Performs an http request and check http code response. Exit if different than allowed. It get the returned id
116 that can be used by following test in the URL with {name} where name is the name of the test
117 :param name: short name of the test
118 :param description: description of the test
119 :param method: HTTP method: GET,PUT,POST,DELETE,...
120 :param url: complete URL or relative URL
121 :param headers: request headers to add to the base headers
122 :param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
123 :param expected_codes: expected response codes, can be int, int tuple or int range
124 :param expected_headers: expected response headers, dict with key values
125 :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
126 :return: requests response
130 self
.s
= requests
.session()
134 elif not url
.startswith("http"):
135 url
= self
.url_base
+ url
137 var_start
= url
.find("{") + 1
139 var_end
= url
.find("}", var_start
)
142 var_name
= url
[var_start
:var_end
]
143 if var_name
in self
.test_ids
:
144 url
= url
[:var_start
-1] + self
.test_ids
[var_name
] + url
[var_end
+1:]
145 var_start
+= len(self
.test_ids
[var_name
])
146 var_start
= url
.find("{", var_start
) + 1
148 if isinstance(payload
, str):
149 if payload
.startswith("@"):
151 file_name
= payload
[1:]
152 if payload
.startswith("@b"):
154 file_name
= payload
[2:]
155 with
open(file_name
, mode
) as f
:
157 elif isinstance(payload
, dict):
158 payload
= json
.dumps(payload
)
160 test
= "Test {} {} {} {}".format(name
, description
, method
, url
)
163 # if expected_payload == "zip":
165 r
= getattr(self
.s
, method
.lower())(url
, data
=payload
, headers
=headers
, verify
=self
.verify
, stream
=stream
)
166 logger
.debug("RX {}: {}".format(r
.status_code
, r
.text
))
170 if isinstance(expected_codes
, int):
171 expected_codes
= (expected_codes
,)
172 if r
.status_code
not in expected_codes
:
174 "Got status {}. Expected {}. {}".format(r
.status_code
, expected_codes
, r
.text
))
177 for header_key
, header_val
in expected_headers
.items():
178 if header_key
.lower() not in r
.headers
:
179 raise TestException("Header {} not present".format(header_key
))
180 if header_val
and header_val
.lower() not in r
.headers
[header_key
]:
181 raise TestException("Header {} does not contain {} but {}".format(header_key
, header_val
,
182 r
.headers
[header_key
]))
184 if expected_payload
is not None:
185 if expected_payload
== 0 and len(r
.content
) > 0:
186 raise TestException("Expected empty payload")
187 elif expected_payload
== "json":
190 except Exception as e
:
191 raise TestException("Expected json response payload, but got Exception {}".format(e
))
192 elif expected_payload
== "yaml":
194 yaml
.safe_load(r
.text
)
195 except Exception as e
:
196 raise TestException("Expected yaml response payload, but got Exception {}".format(e
))
197 elif expected_payload
== "zip":
198 if len(r
.content
) == 0:
199 raise TestException("Expected some response payload, but got empty")
201 # tar = tarfile.open(None, 'r:gz', fileobj=r.raw)
202 # for tarinfo in tar:
203 # tarname = tarinfo.name
205 # except Exception as e:
206 # raise TestException("Expected zip response payload, but got Exception {}".format(e))
207 elif expected_payload
== "text":
208 if len(r
.content
) == 0:
209 raise TestException("Expected some response payload, but got empty")
211 location
= r
.headers
.get("Location")
213 _id
= location
[location
.rfind("/") + 1:]
215 self
.test_ids
[name
] = str(_id
)
216 self
.test_ids
[""] = str(_id
) # last id
218 except TestException
as e
:
219 logger
.error("{} \nRX code{}: {}".format(e
, r
.status_code
, r
.text
))
222 logger
.error("Cannot open file {}".format(e
))
226 if __name__
== "__main__":
230 # Disable warnings from self-signed certificates.
231 requests
.packages
.urllib3
.disable_warnings()
233 logging
.basicConfig(format
="%(levelname)s %(message)s", level
=logging
.ERROR
)
234 logger
= logging
.getLogger('NBI')
235 # load parameters and configuration
236 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hvu:p:",
237 ["url=", "user=", "password=", "help", "version", "verbose", "project=", "insecure"])
238 url
= "https://localhost:9999/osm"
239 user
= password
= project
= "admin"
245 print("test version " + __version__
+ ' ' + version_date
)
247 elif o
in ("-v", "--verbose"):
249 elif o
== "no-verbose":
251 elif o
in ("-h", "--help"):
256 elif o
in ("-u", "--user"):
258 elif o
in ("-p", "--password"):
260 elif o
== "--project":
262 elif o
== "--insecure":
265 assert False, "Unhandled option"
267 logger
.setLevel(logging
.WARNING
)
269 logger
.setLevel(logging
.DEBUG
)
271 logger
.setLevel(logging
.ERROR
)
273 test_rest
= TestRest(url
)
275 # tests without authorization
276 for t
in test_not_authorized_list
:
280 r
= test_rest
.test("token1", "Obtain token", "POST", "/admin/v1/tokens", headers_json
,
281 {"username": user
, "password": password
, "project_id": project
},
282 (200, 201), {"Content-Type": "application/json"}, "json")
284 token
= response
["id"]
285 test_rest
.set_header({"Authorization": "Bearer {}".format(token
)})
287 # tests once authorized
288 for t
in test_authorized_list
:
292 for t
in test_admin_list1
:
296 r
= test_rest
.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json
, None,
297 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
298 location
= r
.headers
["Location"]
299 vnfd_id
= location
[location
.rfind("/")+1:]
300 # print(location, vnfd_id)
303 r
= test_rest
.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT",
304 "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
305 r_header_text
, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
307 # vnfd SHOW OSM format
308 r
= test_rest
.test("VNFD3", "Show VNFD OSM format", "GET",
309 "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id
),
310 headers_json
, None, 200, r_header_json
, "json")
313 r
= test_rest
.test("VNFD4", "Show VNFD SOL005 text", "GET",
314 "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
315 headers_text
, None, 200, r_header_text
, "text")
318 makedirs("temp", exist_ok
=True)
319 tar
= tarfile
.open("temp/cirros_vnf.tar.gz", "w:gz")
320 tar
.add("cirros_vnf")
322 r
= test_rest
.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
323 "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
324 r_header_zip
, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
326 # vnfd SHOW OSM format
327 r
= test_rest
.test("VNFD6", "Show VNFD OSM format", "GET",
328 "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id
),
329 headers_json
, None, 200, r_header_json
, "json")
332 r
= test_rest
.test("VNFD7", "Show VNFD SOL005 zip", "GET",
333 "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
334 headers_zip
, None, 200, r_header_zip
, "zip")
335 # vnfd SHOW descriptor
336 r
= test_rest
.test("VNFD8", "Show VNFD descriptor", "GET",
337 "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id
),
338 headers_text
, None, 200, r_header_text
, "text")
340 r
= test_rest
.test("VNFD9", "Show VNFD artifact", "GET",
341 "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id
),
342 headers_text
, None, 200, r_header_octect
, "text")
345 # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE",
346 # "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
347 # headers_yaml, None, 204, None, 0)
350 r
= test_rest
.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json
, None,
351 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
352 location
= r
.headers
["Location"]
353 nsd_id
= location
[location
.rfind("/")+1:]
354 # print(location, nsd_id)
357 r
= test_rest
.test("NSD2", "Onboard NSD with missing vnfd", "PUT",
358 "/nsd/v1/ns_descriptors/{}/nsd_content?constituent-vnfd.0.vnfd-id-ref"
359 "=NONEXISTING-VNFD".format(nsd_id
),
360 r_header_text
, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml
, "yaml")
363 # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
364 # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
365 # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
367 r
= test_rest
.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT",
368 "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
369 r_header_text
, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
371 # nsd SHOW OSM format
372 r
= test_rest
.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id
),
373 headers_json
, None, 200, r_header_json
, "json")
376 r
= test_rest
.test("NSD4", "Show NSD SOL005 text", "GET",
377 "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
378 headers_text
, None, 200, r_header_text
, "text")
381 makedirs("temp", exist_ok
=True)
382 tar
= tarfile
.open("temp/cirros_ns.tar.gz", "w:gz")
385 r
= test_rest
.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT",
386 "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
387 r_header_zip
, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
389 # nsd SHOW OSM format
390 r
= test_rest
.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id
),
391 headers_json
, None, 200, r_header_json
, "json")
394 r
= test_rest
.test("NSD7", "Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
395 headers_zip
, None, 200, r_header_zip
, "zip")
397 # nsd SHOW descriptor
398 r
= test_rest
.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id
),
399 headers_text
, None, 200, r_header_text
, "text")
401 r
= test_rest
.test("NSD9", "Show NSD artifact", "GET",
402 "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id
),
403 headers_text
, None, 200, r_header_octect
, "text")
406 r
= test_rest
.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id
),
407 headers_yaml
, None, 409, r_header_yaml
, "yaml")
410 r
= test_rest
.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id
),
411 headers_yaml
, None, 204, None, 0)
414 r
= test_rest
.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id
),
415 headers_yaml
, None, 204, None, 0)
419 except Exception as e
:
421 logger
.error(test
+ " Exception: " + str(e
))
424 logger
.critical(test
+ " Exception: " + str(e
), exc_info
=True)