2 # -*- coding: utf-8 -*-
8 #from os.path import getsize, basename
9 #from hashlib import md5
15 from os
import makedirs
16 from copy
import deepcopy
18 __author__
= "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
19 __date__
= "$2018-03-01$"
21 version_date
= "Mar 2018"
25 print("Usage: ", sys
.argv
[0], "[options]")
26 print(" --version: prints current version")
27 print(" -f|--file FILE: file to be sent")
28 print(" -h|--help: shows this help")
29 print(" -u|--url URL: complete server URL")
30 print(" -s|--chunk-size SIZE: size of chunks, by default 1000")
31 print(" -t|--token TOKEN: Authorizaton token, previously obtained from server")
32 print(" -v|--verbose print debug information, can be used several times")
36 r_header_json
= {"Content-type": "application/json"}
38 "Content-type": "application/json",
39 "Accept": "application/json",
41 r_header_yaml
= {"Content-type": "application/yaml"}
43 "Content-type": "application/yaml",
44 "Accept": "application/yaml",
46 r_header_text
= {"Content-type": "text/plain"}
47 r_header_octect
= {"Content-type": "application/octet-stream"}
49 "Accept": "text/plain",
51 r_header_zip
= {"Content-type": "application/zip"}
53 "Accept": "application/zip",
55 # test without authorization
56 test_not_authorized_list
= (
57 ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json
, None, 401, r_header_json
, "json"),
58 ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml
, None, 405, r_header_yaml
, "yaml"),
59 ("NA3", "Invalid version", "DELETE", "/admin/v2/users", headers_yaml
, None, 405, r_header_yaml
, "yaml"),
62 # test ones authorized
63 test_authorized_list
= (
64 ("AU1", "Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id", headers_json
, None, 404, r_header_json
, "json"),
65 ("AU2","Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id", headers_yaml
, None, 404, r_header_yaml
, "yaml"),
66 ("AU3","Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id", headers_yaml
, None, 404, r_header_yaml
, "yaml"),
70 "schema_version": "1.0",
71 "schema_type": "No idea",
73 "description": "Descriptor name",
74 "vim_type": "openstack",
75 "vim_url": "http://localhost:/vim",
76 "vim_tenant_name": "vimTenant",
78 "vim_password": "password",
79 "config": {"config_param": 1}
86 ("VIM1", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json
, vim
, (201, 204), {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"),
87 ("VIM2", "Create VIM bad schema", "POST", "/admin/v1/vim_accounts", headers_json
, vim_bad
, 422, None, headers_json
),
88 ("VIM2", "Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json
, vim
, 409, None, headers_json
),
89 ("VIM4", "Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml
, None, 200, r_header_yaml
, "yaml"),
90 ("VIM5", "Show VIM", "GET", "/admin/v1/vim_accounts/{VIM1}", headers_yaml
, None, 200, r_header_yaml
, "yaml"),
91 ("VIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/{VIM1}", headers_yaml
, None, 202, None, 0),
94 class TestException(Exception):
99 def __init__(self
, url_base
, header_base
={}, verify
=False):
100 self
.url_base
= url_base
101 self
.header_base
= header_base
102 self
.s
= requests
.session()
103 self
.s
.headers
= header_base
105 # contains ID of tests obtained from Location response header. "" key contains last obtained id
108 def set_header(self
, header
):
109 self
.s
.headers
.update(header
)
111 def test(self
, name
, description
, method
, url
, headers
, payload
, expected_codes
, expected_headers
, expected_payload
):
113 Performs an http request and check http code response. Exit if different than allowed. It get the returned id
114 that can be used by following test in the URL with {name} where name is the name of the test
115 :param name: short name of the test
116 :param description: description of the test
117 :param method: HTTP method: GET,PUT,POST,DELETE,...
118 :param url: complete URL or relative URL
119 :param headers: request headers to add to the base headers
120 :param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
121 :param expected_codes: expected response codes, can be int, int tuple or int range
122 :param expected_headers: expected response headers, dict with key values
123 :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
124 :return: requests response
128 self
.s
= requests
.session()
132 elif not url
.startswith("http"):
133 url
= self
.url_base
+ url
135 var_start
= url
.find("{") + 1
137 var_end
= url
.find("}", var_start
)
140 var_name
= url
[var_start
:var_end
]
141 if var_name
in self
.test_ids
:
142 url
= url
[:var_start
-1] + self
.test_ids
[var_name
] + url
[var_end
+1:]
143 var_start
+= len(self
.test_ids
[var_name
])
144 var_start
= url
.find("{", var_start
) + 1
146 if isinstance(payload
, str):
147 if payload
.startswith("@"):
149 file_name
= payload
[1:]
150 if payload
.startswith("@b"):
152 file_name
= payload
[2:]
153 with
open(file_name
, mode
) as f
:
155 elif isinstance(payload
, dict):
156 payload
= json
.dumps(payload
)
158 test
= "Test {} {} {} {}".format(name
, description
, method
, url
)
161 # if expected_payload == "zip":
163 r
= getattr(self
.s
, method
.lower())(url
, data
=payload
, headers
=headers
, verify
=self
.verify
, stream
=stream
)
164 logger
.debug("RX {}: {}".format(r
.status_code
, r
.text
))
168 if isinstance(expected_codes
, int):
169 expected_codes
= (expected_codes
,)
170 if r
.status_code
not in expected_codes
:
172 "Got status {}. Expected {}. {}".format(r
.status_code
, expected_codes
, r
.text
))
175 for header_key
, header_val
in expected_headers
.items():
176 if header_key
.lower() not in r
.headers
:
177 raise TestException("Header {} not present".format(header_key
))
178 if header_val
and header_val
.lower() not in r
.headers
[header_key
]:
179 raise TestException("Header {} does not contain {} but {}".format(header_key
, header_val
,
180 r
.headers
[header_key
]))
182 if expected_payload
is not None:
183 if expected_payload
== 0 and len(r
.content
) > 0:
184 raise TestException("Expected empty payload")
185 elif expected_payload
== "json":
188 except Exception as e
:
189 raise TestException("Expected json response payload, but got Exception {}".format(e
))
190 elif expected_payload
== "yaml":
192 yaml
.safe_load(r
.text
)
193 except Exception as e
:
194 raise TestException("Expected yaml response payload, but got Exception {}".format(e
))
195 elif expected_payload
== "zip":
196 if len(r
.content
) == 0:
197 raise TestException("Expected some response payload, but got empty")
199 # tar = tarfile.open(None, 'r:gz', fileobj=r.raw)
200 # for tarinfo in tar:
201 # tarname = tarinfo.name
203 # except Exception as e:
204 # raise TestException("Expected zip response payload, but got Exception {}".format(e))
205 elif expected_payload
== "text":
206 if len(r
.content
) == 0:
207 raise TestException("Expected some response payload, but got empty")
209 location
= r
.headers
.get("Location")
211 _id
= location
[location
.rfind("/") + 1:]
213 self
.test_ids
[name
] = str(_id
)
214 self
.test_ids
[""] = str(_id
) # last id
216 except TestException
as e
:
217 logger
.error("{} \nRX code{}: {}".format(e
, r
.status_code
, r
.text
))
220 logger
.error("Cannot open file {}".format(e
))
224 if __name__
== "__main__":
228 # Disable warnings from self-signed certificates.
229 requests
.packages
.urllib3
.disable_warnings()
231 logging
.basicConfig(format
="%(levelname)s %(message)s", level
=logging
.ERROR
)
232 logger
= logging
.getLogger('NBI')
233 # load parameters and configuration
234 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hvu:p:",
235 ["url=", "user=", "password=", "help", "version", "verbose", "project=", "insecure"])
236 url
= "https://localhost:9999/osm"
237 user
= password
= project
= "admin"
243 print ("test version " + __version__
+ ' ' + version_date
)
245 elif o
in ("-v", "--verbose"):
247 elif o
in ("no-verbose"):
249 elif o
in ("-h", "--help"):
254 elif o
in ("-u", "--user"):
256 elif o
in ("-p", "--password"):
258 elif o
in ("--project"):
260 elif o
in ("--insecure"):
263 assert False, "Unhandled option"
265 logger
.setLevel(logging
.WARNING
)
267 logger
.setLevel(logging
.DEBUG
)
269 logger
.setLevel(logging
.ERROR
)
271 test_rest
= TestRest(url
)
273 # tests without authorization
274 for t
in test_not_authorized_list
:
278 r
= test_rest
.test("token1", "Obtain token", "POST", "/admin/v1/tokens", headers_json
,
279 {"username": user
, "password": password
, "project_id": project
},
280 (200, 201), {"Content-Type": "application/json"}, "json")
282 token
= response
["id"]
283 test_rest
.set_header({"Authorization": "Bearer {}".format(token
)})
285 # tests once authorized
286 for t
in test_authorized_list
:
290 for t
in test_admin_list1
:
294 r
= test_rest
.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json
, None,
295 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
296 location
= r
.headers
["Location"]
297 vnfd_id
= location
[location
.rfind("/")+1:]
298 # print(location, vnfd_id)
301 r
= test_rest
.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
302 r_header_text
, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
304 # vnfd SHOW OSM format
305 r
= test_rest
.test("VNFD3", "Show VNFD OSM format", "GET", "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id
),
306 headers_json
, None, 200, r_header_json
, "json")
309 r
= test_rest
.test("VNFD4", "Show VNFD SOL005 text", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
310 headers_text
, None, 200, r_header_text
, "text")
313 makedirs("temp", exist_ok
=True)
314 tar
= tarfile
.open("temp/cirros_vnf.tar.gz", "w:gz")
315 tar
.add("cirros_vnf")
317 r
= test_rest
.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
318 r_header_zip
, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
320 # vnfd SHOW OSM format
321 r
= test_rest
.test("VNFD6", "Show VNFD OSM format", "GET", "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id
),
322 headers_json
, None, 200, r_header_json
, "json")
325 r
= test_rest
.test("VNFD7", "Show VNFD SOL005 zip", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id
),
326 headers_zip
, None, 200, r_header_zip
, "zip")
327 # vnfd SHOW descriptor
328 r
= test_rest
.test("VNFD8", "Show VNFD descriptor", "GET", "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id
),
329 headers_text
, None, 200, r_header_text
, "text")
331 r
= test_rest
.test("VNFD9", "Show VNFD artifact", "GET", "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id
),
332 headers_text
, None, 200, r_header_octect
, "text")
335 # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
336 # headers_yaml, None, 204, None, 0)
339 r
= test_rest
.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json
, None,
340 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
341 location
= r
.headers
["Location"]
342 nsd_id
= location
[location
.rfind("/")+1:]
343 # print(location, nsd_id)
346 r
= test_rest
.test("NSD2", "Onboard NSD with missing vnfd", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content?constituent-vnfd.0.vnfd-id-ref=NONEXISTING-VNFD".format(nsd_id
),
347 r_header_text
, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml
, "yaml")
350 # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
351 # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
353 r
= test_rest
.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
354 r_header_text
, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
356 # nsd SHOW OSM format
357 r
= test_rest
.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id
),
358 headers_json
, None, 200, r_header_json
, "json")
361 r
= test_rest
.test("NSD4", "Show NSD SOL005 text", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
362 headers_text
, None, 200, r_header_text
, "text")
365 makedirs("temp", exist_ok
=True)
366 tar
= tarfile
.open("temp/cirros_ns.tar.gz", "w:gz")
369 r
= test_rest
.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
370 r_header_zip
, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
372 # nsd SHOW OSM format
373 r
= test_rest
.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id
),
374 headers_json
, None, 200, r_header_json
, "json")
377 r
= test_rest
.test("NSD7", "Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id
),
378 headers_zip
, None, 200, r_header_zip
, "zip")
380 # nsd SHOW descriptor
381 r
= test_rest
.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id
),
382 headers_text
, None, 200, r_header_text
, "text")
384 r
= test_rest
.test("NSD9", "Show NSD artifact", "GET", "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id
),
385 headers_text
, None, 200, r_header_octect
, "text")
388 r
= test_rest
.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id
),
389 headers_yaml
, None, 409, r_header_yaml
, "yaml")
392 r
= test_rest
.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id
),
393 headers_yaml
, None, 204, None, 0)
396 r
= test_rest
.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id
),
397 headers_yaml
, None, 204, None, 0)
402 except Exception as e
:
404 logger
.error(test
+ " Exception: " + str(e
))
407 logger
.critical(test
+ " Exception: " + str(e
), exc_info
=True)