sol005 packages upload implementation
[osm/NBI.git] / osm_nbi / test / test.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import getopt
5 import sys
6 import requests
7 #import base64
8 #from os.path import getsize, basename
9 #from hashlib import md5
10 import json
11 import logging
12 import yaml
13 #import json
14 import tarfile
15 from os import makedirs
16 from copy import deepcopy
17
18 __author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
19 __date__ = "$2018-03-01$"
20 __version__ = "0.1"
21 version_date = "Mar 2018"
22
23
24 def usage():
25 print("Usage: ", sys.argv[0], "[options]")
26 print(" --version: prints current version")
27 print(" -f|--file FILE: file to be sent")
28 print(" -h|--help: shows this help")
29 print(" -u|--url URL: complete server URL")
30 print(" -s|--chunk-size SIZE: size of chunks, by default 1000")
31 print(" -t|--token TOKEN: Authorizaton token, previously obtained from server")
32 print(" -v|--verbose print debug information, can be used several times")
33 return
34
35
36 r_header_json = {"Content-type": "application/json"}
37 headers_json = {
38 "Content-type": "application/json",
39 "Accept": "application/json",
40 }
41 r_header_yaml = {"Content-type": "application/yaml"}
42 headers_yaml = {
43 "Content-type": "application/yaml",
44 "Accept": "application/yaml",
45 }
46 r_header_text = {"Content-type": "text/plain"}
47 r_header_octect = {"Content-type": "application/octet-stream"}
48 headers_text = {
49 "Accept": "text/plain",
50 }
51 r_header_zip = {"Content-type": "application/zip"}
52 headers_zip = {
53 "Accept": "application/zip",
54 }
55 # test without authorization
56 test_not_authorized_list = (
57 ("Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"),
58 ("Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"),
59 ("Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"),
60 )
61
62 # test ones authorized
63 test_authorized_list = (
64 ("Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id", headers_json, None, 404, r_header_json, "json"),
65 ("Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id", headers_yaml, None, 404, r_header_yaml, "yaml"),
66 ("Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id", headers_yaml, None, 404, r_header_yaml, "yaml"),
67 )
68
69 class TestException(Exception):
70 pass
71
72
73 class TestRest:
74 def __init__(self, url_base, header_base={}, verify=False):
75 self.url_base = url_base
76 self.header_base = header_base
77 self.s = requests.session()
78 self.s.headers = header_base
79 self.verify = verify
80
81 def set_header(self, header):
82 self.s.headers.update(header)
83
84 def test(self, name, method, url, headers, payload, expected_codes, expected_headers, expected_payload):
85 """
86 Performs an http request and check http code response. Exit if different than allowed
87 :param name: name of the test
88 :param method: HTTP method: GET,PUT,POST,DELETE,...
89 :param url: complete URL or relative URL
90 :param headers: request headers to add to the base headers
91 :param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
92 :param expected_codes: expected response codes, can be int, int tuple or int range
93 :param expected_headers: expected response headers, dict with key values
94 :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
95 :return:
96 """
97 try:
98 if not self.s:
99 self.s = requests.session()
100 if not url:
101 url = self.url_base
102 elif not url.startswith("http"):
103 url = self.url_base + url
104 if payload:
105 if isinstance(payload, str):
106 if payload.startswith("@"):
107 mode = "r"
108 file_name = payload[1:]
109 if payload.startswith("@b"):
110 mode = "rb"
111 file_name = payload[2:]
112 with open(file_name, mode) as f:
113 payload = f.read()
114 elif isinstance(payload, dict):
115 payload = json.dumps(payload)
116
117 test = "Test {} {} {}".format(name, method, url)
118 logger.warning(test)
119 stream = False
120 # if expected_payload == "zip":
121 # stream = True
122 r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream)
123 logger.debug("RX {}: {}".format(r.status_code, r.text))
124
125 # check response
126 if expected_codes:
127 if isinstance(expected_codes, int):
128 expected_codes = (expected_codes,)
129 if r.status_code not in expected_codes:
130 raise TestException(
131 "Got status {}. Expected {}. {}".format(r.status_code, expected_codes, r.text))
132
133 if expected_headers:
134 for header_key, header_val in expected_headers.items():
135 if header_key.lower() not in r.headers:
136 raise TestException("Header {} not present".format(header_key))
137 if header_val and header_val.lower() not in r.headers[header_key]:
138 raise TestException("Header {} does not contain {} but {}".format(header_key, header_val,
139 r.headers[header_key]))
140
141 if expected_payload is not None:
142 if expected_payload == 0 and len(r.content) > 0:
143 raise TestException("Expected empty payload")
144 elif expected_payload == "json":
145 try:
146 r.json()
147 except Exception as e:
148 raise TestException("Expected json response payload, but got Exception {}".format(e))
149 elif expected_payload == "yaml":
150 try:
151 yaml.safe_load(r.text)
152 except Exception as e:
153 raise TestException("Expected yaml response payload, but got Exception {}".format(e))
154 elif expected_payload == "zip":
155 if len(r.content) == 0:
156 raise TestException("Expected some response payload, but got empty")
157 # try:
158 # tar = tarfile.open(None, 'r:gz', fileobj=r.raw)
159 # for tarinfo in tar:
160 # tarname = tarinfo.name
161 # print(tarname)
162 # except Exception as e:
163 # raise TestException("Expected zip response payload, but got Exception {}".format(e))
164 elif expected_payload == "text":
165 if len(r.content) == 0:
166 raise TestException("Expected some response payload, but got empty")
167 #r.text
168 return r
169 except TestException as e:
170 logger.error("{} \nRX code{}: {}".format(e, r.status_code, r.text))
171 exit(1)
172 except IOError as e:
173 logger.error("Cannot open file {}".format(e))
174 exit(1)
175
176
177 if __name__ == "__main__":
178 global logger
179 test = ""
180 try:
181 logging.basicConfig(format="%(levelname)s %(message)s", level=logging.ERROR)
182 logger = logging.getLogger('NBI')
183 # load parameters and configuration
184 opts, args = getopt.getopt(sys.argv[1:], "hvu:p:",
185 ["url=", "user=", "password=", "help", "version", "verbose", "project=", "insecure"])
186 url = "https://localhost:9999/osm"
187 user = password = project = "admin"
188 verbose = 0
189 verify = True
190
191 for o, a in opts:
192 if o == "--version":
193 print ("test version " + __version__ + ' ' + version_date)
194 sys.exit()
195 elif o in ("-v", "--verbose"):
196 verbose += 1
197 elif o in ("no-verbose"):
198 verbose = -1
199 elif o in ("-h", "--help"):
200 usage()
201 sys.exit()
202 elif o in ("--url"):
203 url = a
204 elif o in ("-u", "--user"):
205 user = a
206 elif o in ("-p", "--password"):
207 password = a
208 elif o in ("--project"):
209 project = a
210 elif o in ("--insecure"):
211 verify = False
212 else:
213 assert False, "Unhandled option"
214 if verbose == 0:
215 logger.setLevel(logging.WARNING)
216 elif verbose > 1:
217 logger.setLevel(logging.DEBUG)
218 else:
219 logger.setLevel(logging.ERROR)
220
221 test_rest = TestRest(url)
222
223 # tests without authorization
224 for t in test_not_authorized_list:
225 test_rest.test(*t)
226
227 # get token
228 r = test_rest.test("Obtain token", "POST", "/admin/v1/tokens", headers_json,
229 {"username": user, "password": password, "project_id": project},
230 (200, 201), {"Content-Type": "application/json"}, "json")
231 response = r.json()
232 token = response["id"]
233 test_rest.set_header({"Authorization": "Bearer {}".format(token)})
234
235 # tests once authorized
236 for t in test_authorized_list:
237 test_rest.test(*t)
238
239 # nsd CREATE
240 r = test_rest.test("Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None,
241 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
242 location = r.headers["Location"]
243 nsd_id = location[location.rfind("/")+1:]
244 # print(location, nsd_id)
245
246 # nsd UPLOAD test
247 r = test_rest.test("Onboard NSD step 2 as TEXT", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
248 r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
249
250 # nsd SHOW OSM format
251 r = test_rest.test("Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
252 headers_json, None, 200, r_header_json, "json")
253
254 # nsd SHOW text
255 r = test_rest.test("Show NSD SOL005 text", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
256 headers_text, None, 200, r_header_text, "text")
257
258 # nsd UPLOAD ZIP
259 makedirs("temp", exist_ok=True)
260 tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz")
261 tar.add("cirros_ns")
262 tar.close()
263 r = test_rest.test("Onboard NSD step 3 replace with ZIP", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
264 r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
265
266 # nsd SHOW OSM format
267 r = test_rest.test("Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
268 headers_json, None, 200, r_header_json, "json")
269
270 # nsd SHOW zip
271 r = test_rest.test("Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
272 headers_zip, None, 200, r_header_zip, "zip")
273
274 # nsd SHOW descriptor
275 r = test_rest.test("Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id),
276 headers_text, None, 200, r_header_text, "text")
277 # nsd SHOW actifact
278 r = test_rest.test("Show NSD artifact", "GET", "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id),
279 headers_text, None, 200, r_header_octect, "text")
280
281 # nsd DELETE
282 r = test_rest.test("Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id),
283 headers_yaml, None, 204, None, 0)
284
285 # vnfd CREATE
286 r = test_rest.test("Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None,
287 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
288 location = r.headers["Location"]
289 vnfd_id = location[location.rfind("/")+1:]
290 # print(location, vnfd_id)
291
292 # vnfd UPLOAD test
293 r = test_rest.test("Onboard VNFD step 2 as TEXT", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
294 r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
295
296 # vnfd SHOW OSM format
297 r = test_rest.test("Show VNFD OSM format", "GET", "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
298 headers_json, None, 200, r_header_json, "json")
299
300 # vnfd SHOW text
301 r = test_rest.test("Show VNFD SOL005 text", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
302 headers_text, None, 200, r_header_text, "text")
303
304 # vnfd UPLOAD ZIP
305 makedirs("temp", exist_ok=True)
306 tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz")
307 tar.add("cirros_vnf")
308 tar.close()
309 r = test_rest.test("Onboard VNFD step 3 replace with ZIP", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
310 r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
311
312 # vnfd SHOW OSM format
313 r = test_rest.test("Show VNFD OSM format", "GET", "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
314 headers_json, None, 200, r_header_json, "json")
315
316 # vnfd SHOW zip
317 r = test_rest.test("Show VNFD SOL005 zip", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
318 headers_zip, None, 200, r_header_zip, "zip")
319 # vnfd SHOW descriptor
320 r = test_rest.test("Show VNFD descriptor", "GET", "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id),
321 headers_text, None, 200, r_header_text, "text")
322 # vnfd SHOW actifact
323 r = test_rest.test("Show VNFD artifact", "GET", "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id),
324 headers_text, None, 200, r_header_octect, "text")
325
326 # vnfd DELETE
327 r = test_rest.test("Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
328 headers_yaml, None, 204, None, 0)
329
330 print("PASS")
331
332 except Exception as e:
333 if test:
334 logger.error(test + " Exception: " + str(e))
335 exit(1)
336 else:
337 logger.critical(test + " Exception: " + str(e), exc_info=True)