fix flake8 tests
[osm/NBI.git] / osm_nbi / tests / test.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import getopt
5 import sys
6 import requests
7 import json
8 import logging
9 import yaml
10 # import json
11 import tarfile
12 from os import makedirs
13
14 __author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
15 __date__ = "$2018-03-01$"
16 __version__ = "0.1"
17 version_date = "Mar 2018"
18
19
20 def usage():
21 print("Usage: ", sys.argv[0], "[options]")
22 print(" --version: prints current version")
23 print(" -f|--file FILE: file to be sent")
24 print(" -h|--help: shows this help")
25 print(" -u|--url URL: complete server URL")
26 print(" -s|--chunk-size SIZE: size of chunks, by default 1000")
27 print(" -t|--token TOKEN: Authorizaton token, previously obtained from server")
28 print(" -v|--verbose print debug information, can be used several times")
29 return
30
31
32 r_header_json = {"Content-type": "application/json"}
33 headers_json = {
34 "Content-type": "application/json",
35 "Accept": "application/json",
36 }
37 r_header_yaml = {"Content-type": "application/yaml"}
38 headers_yaml = {
39 "Content-type": "application/yaml",
40 "Accept": "application/yaml",
41 }
42 r_header_text = {"Content-type": "text/plain"}
43 r_header_octect = {"Content-type": "application/octet-stream"}
44 headers_text = {
45 "Accept": "text/plain",
46 }
47 r_header_zip = {"Content-type": "application/zip"}
48 headers_zip = {
49 "Accept": "application/zip",
50 }
51 # test without authorization
52 test_not_authorized_list = (
53 ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"),
54 ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"),
55 ("NA3", "Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"),
56 )
57
58 # test ones authorized
59 test_authorized_list = (
60 ("AU1", "Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id",
61 headers_json, None, 404, r_header_json, "json"),
62 ("AU2", "Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id",
63 headers_yaml, None, 404, r_header_yaml, "yaml"),
64 ("AU3", "Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id",
65 headers_yaml, None, 404, r_header_yaml, "yaml"),
66 )
67
68 vim = {
69 "schema_version": "1.0",
70 "schema_type": "No idea",
71 "name": "myVim",
72 "description": "Descriptor name",
73 "vim_type": "openstack",
74 "vim_url": "http://localhost:/vim",
75 "vim_tenant_name": "vimTenant",
76 "vim_user": "user",
77 "vim_password": "password",
78 "config": {"config_param": 1}
79 }
80
81 vim_bad = vim.copy()
82 vim_bad.pop("name")
83
84 test_admin_list1 = (
85 ("VIM1", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, vim, (201, 204),
86 {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"),
87 ("VIM2", "Create VIM bad schema", "POST", "/admin/v1/vim_accounts", headers_json, vim_bad, 422, None, headers_json),
88 ("VIM2", "Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json, vim, 409, None, headers_json),
89 ("VIM4", "Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml, None, 200, r_header_yaml, "yaml"),
90 ("VIM5", "Show VIM", "GET", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 200, r_header_yaml, "yaml"),
91 ("VIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 202, None, 0),
92 )
93
94
95 class TestException(Exception):
96 pass
97
98
99 class TestRest:
100 def __init__(self, url_base, header_base={}, verify=False):
101 self.url_base = url_base
102 self.header_base = header_base
103 self.s = requests.session()
104 self.s.headers = header_base
105 self.verify = verify
106 # contains ID of tests obtained from Location response header. "" key contains last obtained id
107 self.test_ids = {}
108
109 def set_header(self, header):
110 self.s.headers.update(header)
111
112 def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers,
113 expected_payload):
114 """
115 Performs an http request and check http code response. Exit if different than allowed. It get the returned id
116 that can be used by following test in the URL with {name} where name is the name of the test
117 :param name: short name of the test
118 :param description: description of the test
119 :param method: HTTP method: GET,PUT,POST,DELETE,...
120 :param url: complete URL or relative URL
121 :param headers: request headers to add to the base headers
122 :param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
123 :param expected_codes: expected response codes, can be int, int tuple or int range
124 :param expected_headers: expected response headers, dict with key values
125 :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
126 :return: requests response
127 """
128 try:
129 if not self.s:
130 self.s = requests.session()
131 # URL
132 if not url:
133 url = self.url_base
134 elif not url.startswith("http"):
135 url = self.url_base + url
136
137 var_start = url.find("{") + 1
138 while var_start:
139 var_end = url.find("}", var_start)
140 if var_end == -1:
141 break
142 var_name = url[var_start:var_end]
143 if var_name in self.test_ids:
144 url = url[:var_start-1] + self.test_ids[var_name] + url[var_end+1:]
145 var_start += len(self.test_ids[var_name])
146 var_start = url.find("{", var_start) + 1
147 if payload:
148 if isinstance(payload, str):
149 if payload.startswith("@"):
150 mode = "r"
151 file_name = payload[1:]
152 if payload.startswith("@b"):
153 mode = "rb"
154 file_name = payload[2:]
155 with open(file_name, mode) as f:
156 payload = f.read()
157 elif isinstance(payload, dict):
158 payload = json.dumps(payload)
159
160 test = "Test {} {} {} {}".format(name, description, method, url)
161 logger.warning(test)
162 stream = False
163 # if expected_payload == "zip":
164 # stream = True
165 r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream)
166 logger.debug("RX {}: {}".format(r.status_code, r.text))
167
168 # check response
169 if expected_codes:
170 if isinstance(expected_codes, int):
171 expected_codes = (expected_codes,)
172 if r.status_code not in expected_codes:
173 raise TestException(
174 "Got status {}. Expected {}. {}".format(r.status_code, expected_codes, r.text))
175
176 if expected_headers:
177 for header_key, header_val in expected_headers.items():
178 if header_key.lower() not in r.headers:
179 raise TestException("Header {} not present".format(header_key))
180 if header_val and header_val.lower() not in r.headers[header_key]:
181 raise TestException("Header {} does not contain {} but {}".format(header_key, header_val,
182 r.headers[header_key]))
183
184 if expected_payload is not None:
185 if expected_payload == 0 and len(r.content) > 0:
186 raise TestException("Expected empty payload")
187 elif expected_payload == "json":
188 try:
189 r.json()
190 except Exception as e:
191 raise TestException("Expected json response payload, but got Exception {}".format(e))
192 elif expected_payload == "yaml":
193 try:
194 yaml.safe_load(r.text)
195 except Exception as e:
196 raise TestException("Expected yaml response payload, but got Exception {}".format(e))
197 elif expected_payload == "zip":
198 if len(r.content) == 0:
199 raise TestException("Expected some response payload, but got empty")
200 # try:
201 # tar = tarfile.open(None, 'r:gz', fileobj=r.raw)
202 # for tarinfo in tar:
203 # tarname = tarinfo.name
204 # print(tarname)
205 # except Exception as e:
206 # raise TestException("Expected zip response payload, but got Exception {}".format(e))
207 elif expected_payload == "text":
208 if len(r.content) == 0:
209 raise TestException("Expected some response payload, but got empty")
210 # r.text
211 location = r.headers.get("Location")
212 if location:
213 _id = location[location.rfind("/") + 1:]
214 if _id:
215 self.test_ids[name] = str(_id)
216 self.test_ids[""] = str(_id) # last id
217 return r
218 except TestException as e:
219 logger.error("{} \nRX code{}: {}".format(e, r.status_code, r.text))
220 exit(1)
221 except IOError as e:
222 logger.error("Cannot open file {}".format(e))
223 exit(1)
224
225
226 if __name__ == "__main__":
227 global logger
228 test = ""
229
230 # Disable warnings from self-signed certificates.
231 requests.packages.urllib3.disable_warnings()
232 try:
233 logging.basicConfig(format="%(levelname)s %(message)s", level=logging.ERROR)
234 logger = logging.getLogger('NBI')
235 # load parameters and configuration
236 opts, args = getopt.getopt(sys.argv[1:], "hvu:p:",
237 ["url=", "user=", "password=", "help", "version", "verbose", "project=", "insecure"])
238 url = "https://localhost:9999/osm"
239 user = password = project = "admin"
240 verbose = 0
241 verify = True
242
243 for o, a in opts:
244 if o == "--version":
245 print("test version " + __version__ + ' ' + version_date)
246 sys.exit()
247 elif o in ("-v", "--verbose"):
248 verbose += 1
249 elif o == "no-verbose":
250 verbose = -1
251 elif o in ("-h", "--help"):
252 usage()
253 sys.exit()
254 elif o == "--url":
255 url = a
256 elif o in ("-u", "--user"):
257 user = a
258 elif o in ("-p", "--password"):
259 password = a
260 elif o == "--project":
261 project = a
262 elif o == "--insecure":
263 verify = False
264 else:
265 assert False, "Unhandled option"
266 if verbose == 0:
267 logger.setLevel(logging.WARNING)
268 elif verbose > 1:
269 logger.setLevel(logging.DEBUG)
270 else:
271 logger.setLevel(logging.ERROR)
272
273 test_rest = TestRest(url)
274
275 # tests without authorization
276 for t in test_not_authorized_list:
277 test_rest.test(*t)
278
279 # get token
280 r = test_rest.test("token1", "Obtain token", "POST", "/admin/v1/tokens", headers_json,
281 {"username": user, "password": password, "project_id": project},
282 (200, 201), {"Content-Type": "application/json"}, "json")
283 response = r.json()
284 token = response["id"]
285 test_rest.set_header({"Authorization": "Bearer {}".format(token)})
286
287 # tests once authorized
288 for t in test_authorized_list:
289 test_rest.test(*t)
290
291 # tests admin
292 for t in test_admin_list1:
293 test_rest.test(*t)
294
295 # vnfd CREATE
296 r = test_rest.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None,
297 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
298 location = r.headers["Location"]
299 vnfd_id = location[location.rfind("/")+1:]
300 # print(location, vnfd_id)
301
302 # vnfd UPLOAD test
303 r = test_rest.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT",
304 "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
305 r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
306
307 # vnfd SHOW OSM format
308 r = test_rest.test("VNFD3", "Show VNFD OSM format", "GET",
309 "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
310 headers_json, None, 200, r_header_json, "json")
311
312 # vnfd SHOW text
313 r = test_rest.test("VNFD4", "Show VNFD SOL005 text", "GET",
314 "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
315 headers_text, None, 200, r_header_text, "text")
316
317 # vnfd UPLOAD ZIP
318 makedirs("temp", exist_ok=True)
319 tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz")
320 tar.add("cirros_vnf")
321 tar.close()
322 r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
323 "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
324 r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
325
326 # vnfd SHOW OSM format
327 r = test_rest.test("VNFD6", "Show VNFD OSM format", "GET",
328 "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
329 headers_json, None, 200, r_header_json, "json")
330
331 # vnfd SHOW zip
332 r = test_rest.test("VNFD7", "Show VNFD SOL005 zip", "GET",
333 "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
334 headers_zip, None, 200, r_header_zip, "zip")
335 # vnfd SHOW descriptor
336 r = test_rest.test("VNFD8", "Show VNFD descriptor", "GET",
337 "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id),
338 headers_text, None, 200, r_header_text, "text")
339 # vnfd SHOW actifact
340 r = test_rest.test("VNFD9", "Show VNFD artifact", "GET",
341 "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id),
342 headers_text, None, 200, r_header_octect, "text")
343
344 # # vnfd DELETE
345 # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE",
346 # "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
347 # headers_yaml, None, 204, None, 0)
348
349 # nsd CREATE
350 r = test_rest.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None,
351 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
352 location = r.headers["Location"]
353 nsd_id = location[location.rfind("/")+1:]
354 # print(location, nsd_id)
355
356 # nsd UPLOAD test
357 r = test_rest.test("NSD2", "Onboard NSD with missing vnfd", "PUT",
358 "/nsd/v1/ns_descriptors/{}/nsd_content?constituent-vnfd.0.vnfd-id-ref"
359 "=NONEXISTING-VNFD".format(nsd_id),
360 r_header_text, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml, "yaml")
361
362 # # VNF_CREATE
363 # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
364 # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
365 # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
366
367 r = test_rest.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT",
368 "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
369 r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
370
371 # nsd SHOW OSM format
372 r = test_rest.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
373 headers_json, None, 200, r_header_json, "json")
374
375 # nsd SHOW text
376 r = test_rest.test("NSD4", "Show NSD SOL005 text", "GET",
377 "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
378 headers_text, None, 200, r_header_text, "text")
379
380 # nsd UPLOAD ZIP
381 makedirs("temp", exist_ok=True)
382 tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz")
383 tar.add("cirros_ns")
384 tar.close()
385 r = test_rest.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT",
386 "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
387 r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
388
389 # nsd SHOW OSM format
390 r = test_rest.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
391 headers_json, None, 200, r_header_json, "json")
392
393 # nsd SHOW zip
394 r = test_rest.test("NSD7", "Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
395 headers_zip, None, 200, r_header_zip, "zip")
396
397 # nsd SHOW descriptor
398 r = test_rest.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id),
399 headers_text, None, 200, r_header_text, "text")
400 # nsd SHOW actifact
401 r = test_rest.test("NSD9", "Show NSD artifact", "GET",
402 "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id),
403 headers_text, None, 200, r_header_octect, "text")
404
405 # vnfd DELETE
406 r = test_rest.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
407 headers_yaml, None, 409, r_header_yaml, "yaml")
408
409 # nsd DELETE
410 r = test_rest.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id),
411 headers_yaml, None, 204, None, 0)
412
413 # vnfd DELETE
414 r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
415 headers_yaml, None, 204, None, 0)
416
417 print("PASS")
418
419 except Exception as e:
420 if test:
421 logger.error(test + " Exception: " + str(e))
422 exit(1)
423 else:
424 logger.critical(test + " Exception: " + str(e), exc_info=True)