improvement to the system test
[osm/NBI.git] / osm_nbi / tests / test.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import getopt
5 import sys
6 import requests
7 import json
8 import logging
9 import yaml
10 # import json
11 # import tarfile
12 from time import sleep
13 import os
14
15 __author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
16 __date__ = "$2018-03-01$"
17 __version__ = "0.2"
18 version_date = "Jul 2018"
19
20
21 def usage():
22 print("Usage: ", sys.argv[0], "[options]")
23 print(" Performs system tests over running NBI. It can be used for real OSM test using option '--test-osm'")
24 print(" If this is the case env variables 'OSMNBITEST_VIM_NAME' must be suplied to create a VIM if not exist "
25 "where deployment is done")
26 print("OPTIONS")
27 print(" -h|--help: shows this help")
28 print(" --insecure: Allows non trusted https NBI server")
29 print(" --list: list available tests")
30 print(" --manual-check: Deployment tests stop after deployed to allow manual inspection. Only make sense with "
31 "'--test-osm'")
32 print(" -p|--password PASSWORD: NBI access password. 'admin' by default")
33 print(" ---project PROJECT: NBI access project. 'admin' by default")
34 print(" --test TEST[,...]: Execute only a test or a comma separated list of tests")
35 print(" --params key=val: params to the previous test. key can be vnfd-files, nsd-file, ns-name, ns-config")
36 print(" --test-osm: If missing this test is intended for NBI only, no other OSM components are expected. Use "
37 "this flag to test the system. LCM and RO components are expected to be up and running")
38 print(" --timeout TIMEOUT: General NBI timeout, by default {}s".format(timeout))
39 print(" --timeout-deploy TIMEOUT: Timeout used for getting NS deployed, by default {}s".format(timeout_deploy))
40 print(" --timeout-configure TIMEOUT: Timeout used for getting NS deployed and configured,"
41 " by default {}s".format(timeout_configure))
42 print(" -u|--user USERNAME: NBI access username. 'admin' by default")
43 print(" --url URL: complete NBI server URL. 'https//localhost:9999/osm' by default")
44 print(" -v|--verbose print debug information, can be used several times")
45 print(" --no-verbose remove verbosity")
46 print(" --version: prints current version")
47 print("ENV variables used for real deployment tests with option osm-test.")
48 print(" export OSMNBITEST_VIM_NAME=vim-name")
49 print(" export OSMNBITEST_VIM_URL=vim-url")
50 print(" export OSMNBITEST_VIM_TYPE=vim-type")
51 print(" export OSMNBITEST_VIM_TENANT=vim-tenant")
52 print(" export OSMNBITEST_VIM_USER=vim-user")
53 print(" export OSMNBITEST_VIM_PASSWORD=vim-password")
54 print(" export OSMNBITEST_VIM_CONFIG=\"vim-config\"")
55 print(" export OSMNBITEST_NS_NAME=\"vim-config\"")
56 return
57
58
59 r_header_json = {"Content-type": "application/json"}
60 headers_json = {
61 "Content-type": "application/json",
62 "Accept": "application/json",
63 }
64 r_header_yaml = {"Content-type": "application/yaml"}
65 headers_yaml = {
66 "Content-type": "application/yaml",
67 "Accept": "application/yaml",
68 }
69 r_header_text = {"Content-type": "text/plain"}
70 r_header_octect = {"Content-type": "application/octet-stream"}
71 headers_text = {
72 "Accept": "text/plain",
73 }
74 r_header_zip = {"Content-type": "application/zip"}
75 headers_zip = {
76 "Accept": "application/zip",
77 }
78 headers_zip_yaml = {
79 "Accept": "application/yaml", "Content-type": "application/zip"
80 }
81
82
83 # test ones authorized
84 test_authorized_list = (
85 ("AU1", "Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id",
86 headers_json, None, 404, r_header_json, "json"),
87 ("AU2", "Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id",
88 headers_yaml, None, 404, r_header_yaml, "yaml"),
89 ("AU3", "Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id",
90 headers_yaml, None, 404, r_header_yaml, "yaml"),
91 )
92 timeout = 120 # general timeout
93 timeout_deploy = 60*10 # timeout for NS deploying without charms
94 timeout_configure = 60*20 # timeout for NS deploying and configuring
95
96
97 class TestException(Exception):
98 pass
99
100
101 class TestRest:
102 def __init__(self, url_base, header_base=None, verify=False, user="admin", password="admin", project="admin"):
103 self.url_base = url_base
104 if header_base is None:
105 self.header_base = {}
106 else:
107 self.header_base = header_base.copy()
108 self.s = requests.session()
109 self.s.headers = self.header_base
110 self.verify = verify
111 self.token = False
112 self.user = user
113 self.password = password
114 self.project = project
115 self.vim_id = None
116 # contains ID of tests obtained from Location response header. "" key contains last obtained id
117 self.test_ids = {}
118 self.old_test_description = ""
119
120 def set_header(self, header):
121 self.s.headers.update(header)
122
123 def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers,
124 expected_payload):
125 """
126 Performs an http request and check http code response. Exit if different than allowed. It get the returned id
127 that can be used by following test in the URL with {name} where name is the name of the test
128 :param name: short name of the test
129 :param description: description of the test
130 :param method: HTTP method: GET,PUT,POST,DELETE,...
131 :param url: complete URL or relative URL
132 :param headers: request headers to add to the base headers
133 :param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
134 :param expected_codes: expected response codes, can be int, int tuple or int range
135 :param expected_headers: expected response headers, dict with key values
136 :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
137 :return: requests response
138 """
139 r = None
140 try:
141 if not self.s:
142 self.s = requests.session()
143 # URL
144 if not url:
145 url = self.url_base
146 elif not url.startswith("http"):
147 url = self.url_base + url
148
149 var_start = url.find("<") + 1
150 while var_start:
151 var_end = url.find(">", var_start)
152 if var_end == -1:
153 break
154 var_name = url[var_start:var_end]
155 if var_name in self.test_ids:
156 url = url[:var_start-1] + self.test_ids[var_name] + url[var_end+1:]
157 var_start += len(self.test_ids[var_name])
158 var_start = url.find("<", var_start) + 1
159 if payload:
160 if isinstance(payload, str):
161 if payload.startswith("@"):
162 mode = "r"
163 file_name = payload[1:]
164 if payload.startswith("@b"):
165 mode = "rb"
166 file_name = payload[2:]
167 with open(file_name, mode) as f:
168 payload = f.read()
169 elif isinstance(payload, dict):
170 payload = json.dumps(payload)
171
172 test_description = "Test {} {} {} {}".format(name, description, method, url)
173 if self.old_test_description != test_description:
174 self.old_test_description = test_description
175 logger.warning(test_description)
176 stream = False
177 # if expected_payload == "zip":
178 # stream = True
179 r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream)
180 logger.debug("RX {}: {}".format(r.status_code, r.text))
181
182 # check response
183 if expected_codes:
184 if isinstance(expected_codes, int):
185 expected_codes = (expected_codes,)
186 if r.status_code not in expected_codes:
187 raise TestException(
188 "Got status {}. Expected {}. {}".format(r.status_code, expected_codes, r.text))
189
190 if expected_headers:
191 for header_key, header_val in expected_headers.items():
192 if header_key.lower() not in r.headers:
193 raise TestException("Header {} not present".format(header_key))
194 if header_val and header_val.lower() not in r.headers[header_key]:
195 raise TestException("Header {} does not contain {} but {}".format(header_key, header_val,
196 r.headers[header_key]))
197
198 if expected_payload is not None:
199 if expected_payload == 0 and len(r.content) > 0:
200 raise TestException("Expected empty payload")
201 elif expected_payload == "json":
202 try:
203 r.json()
204 except Exception as e:
205 raise TestException("Expected json response payload, but got Exception {}".format(e))
206 elif expected_payload == "yaml":
207 try:
208 yaml.safe_load(r.text)
209 except Exception as e:
210 raise TestException("Expected yaml response payload, but got Exception {}".format(e))
211 elif expected_payload == "zip":
212 if len(r.content) == 0:
213 raise TestException("Expected some response payload, but got empty")
214 # try:
215 # tar = tarfile.open(None, 'r:gz', fileobj=r.raw)
216 # for tarinfo in tar:
217 # tarname = tarinfo.name
218 # print(tarname)
219 # except Exception as e:
220 # raise TestException("Expected zip response payload, but got Exception {}".format(e))
221 elif expected_payload == "text":
222 if len(r.content) == 0:
223 raise TestException("Expected some response payload, but got empty")
224 # r.text
225 location = r.headers.get("Location")
226 if location:
227 _id = location[location.rfind("/") + 1:]
228 if _id:
229 self.test_ids[name] = str(_id)
230 self.test_ids[""] = str(_id) # last id
231 return r
232 except TestException as e:
233 r_status_code = None
234 r_text = None
235 if r:
236 r_status_code = r.status_code
237 r_text = r.text
238 logger.error("{} \nRX code{}: {}".format(e, r_status_code, r_text))
239 exit(1)
240 except IOError as e:
241 logger.error("Cannot open file {}".format(e))
242 exit(1)
243
244 def get_autorization(self): # user=None, password=None, project=None):
245 if self.token: # and self.user == user and self.password == password and self.project == project:
246 return
247 # self.user = user
248 # self.password = password
249 # self.project = project
250 r = self.test("TOKEN", "Obtain token", "POST", "/admin/v1/tokens", headers_json,
251 {"username": self.user, "password": self.password, "project_id": self.project},
252 (200, 201), {"Content-Type": "application/json"}, "json")
253 response = r.json()
254 self.token = response["id"]
255 self.set_header({"Authorization": "Bearer {}".format(self.token)})
256
257 def get_create_vim(self, test_osm):
258 if self.vim_id:
259 return self.vim_id
260 self.get_autorization()
261 if test_osm:
262 vim_name = os.environ.get("OSMNBITEST_VIM_NAME")
263 if not vim_name:
264 raise TestException(
265 "Needed to define OSMNBITEST_VIM_XXX variables to create a real VIM for deployment")
266 else:
267 vim_name = "fakeVim"
268 # Get VIM
269 r = self.test("_VIMGET1", "Get VIM ID", "GET", "/admin/v1/vim_accounts?name={}".format(vim_name), headers_json,
270 None, 200, r_header_json, "json")
271 vims = r.json()
272 if vims:
273 return vims[0]["_id"]
274 # Add VIM
275 if test_osm:
276 # check needed environ parameters:
277 if not os.environ.get("OSMNBITEST_VIM_URL") or not os.environ.get("OSMNBITEST_VIM_TENANT"):
278 raise TestException("Env OSMNBITEST_VIM_URL and OSMNBITEST_VIM_TENANT are needed for create a real VIM"
279 " to deploy on whit the --test-osm option")
280 vim_data = "{{schema_version: '1.0', name: '{}', vim_type: {}, vim_url: '{}', vim_tenant_name: '{}', "\
281 "vim_user: {}, vim_password: {}".format(vim_name,
282 os.environ.get("OSMNBITEST_VIM_TYPE", "openstack"),
283 os.environ.get("OSMNBITEST_VIM_URL"),
284 os.environ.get("OSMNBITEST_VIM_TENANT"),
285 os.environ.get("OSMNBITEST_VIM_USER"),
286 os.environ.get("OSMNBITEST_VIM_PASSWORD"))
287 if os.environ.get("OSMNBITEST_VIM_CONFIG"):
288 vim_data += " ,config: {}".format(os.environ.get("OSMNBITEST_VIM_CONFIG"))
289 vim_data += "}"
290 else:
291 vim_data = "{schema_version: '1.0', name: fakeVim, vim_type: openstack, vim_url: 'http://10.11.12.13/fake'"\
292 ", vim_tenant_name: 'vimtenant', vim_user: vimuser, vim_password: vimpassword}"
293 r = self.test("_VIMGET2", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_yaml, vim_data,
294 (201), {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/yaml"}, "yaml")
295 location = r.headers.get("Location")
296 return location[location.rfind("/") + 1:]
297
298
299 class TestNonAuthorized:
300 description = "test invalid URLs. methods and no authorization"
301
302 @staticmethod
303 def run(engine, test_osm, manual_check):
304 test_not_authorized_list = (
305 ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"),
306 ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"),
307 ("NA3", "Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"),
308 )
309 for t in test_not_authorized_list:
310 engine.test(*t)
311
312
313 class TestFakeVim:
314 description = "Creates/edit/delete fake VIMs and SDN controllers"
315
316 def __init__(self):
317 self.vim = {
318 "schema_version": "1.0",
319 "schema_type": "No idea",
320 "name": "myVim",
321 "description": "Descriptor name",
322 "vim_type": "openstack",
323 "vim_url": "http://localhost:/vim",
324 "vim_tenant_name": "vimTenant",
325 "vim_user": "user",
326 "vim_password": "password",
327 "config": {"config_param": 1}
328 }
329 self.sdn = {
330 "name": "sdn-name",
331 "description": "sdn-description",
332 "dpid": "50:50:52:54:00:94:21:21",
333 "ip": "192.168.15.17",
334 "port": 8080,
335 "type": "opendaylight",
336 "version": "3.5.6",
337 "user": "user",
338 "password": "passwd"
339 }
340 self.port_mapping = [
341 {"compute_node": "compute node 1",
342 "ports": [{"pci": "0000:81:00.0", "switch_port": "port-2/1", "switch_mac": "52:54:00:94:21:21"},
343 {"pci": "0000:81:00.1", "switch_port": "port-2/2", "switch_mac": "52:54:00:94:21:22"}
344 ]},
345 {"compute_node": "compute node 2",
346 "ports": [{"pci": "0000:81:00.0", "switch_port": "port-2/3", "switch_mac": "52:54:00:94:21:23"},
347 {"pci": "0000:81:00.1", "switch_port": "port-2/4", "switch_mac": "52:54:00:94:21:24"}
348 ]}
349 ]
350
351 def run(self, engine, test_osm, manual_check):
352
353 vim_bad = self.vim.copy()
354 vim_bad.pop("name")
355
356 engine.get_autorization()
357 engine.test("FVIM1", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (201, 204),
358 {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json")
359 engine.test("FVIM2", "Create VIM without name, bad schema", "POST", "/admin/v1/vim_accounts", headers_json,
360 vim_bad, 422, None, headers_json)
361 engine.test("FVIM3", "Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json, self.vim,
362 409, None, headers_json)
363 engine.test("FVIM4", "Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml, None, 200, r_header_yaml,
364 "yaml")
365 engine.test("FVIM5", "Show VIM", "GET", "/admin/v1/vim_accounts/<FVIM1>", headers_yaml, None, 200,
366 r_header_yaml, "yaml")
367 if not test_osm:
368 # delete with FORCE
369 engine.test("FVIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/<FVIM1>?FORCE=True", headers_yaml,
370 None, 202, None, 0)
371 engine.test("FVIM7", "Check VIM is deleted", "GET", "/admin/v1/vim_accounts/<FVIM1>", headers_yaml, None,
372 404, r_header_yaml, "yaml")
373 else:
374 # delete and wait until is really deleted
375 engine.test("FVIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/<FVIM1>", headers_yaml, None, 202,
376 None, 0)
377 wait = timeout
378 while wait >= 0:
379 r = engine.test("FVIM7", "Check VIM is deleted", "GET", "/admin/v1/vim_accounts/<FVIM1>", headers_yaml,
380 None, None, r_header_yaml, "yaml")
381 if r.status_code == 404:
382 break
383 elif r.status_code == 200:
384 wait -= 5
385 sleep(5)
386 else:
387 raise TestException("Vim created at 'FVIM1' is not delete after {} seconds".format(timeout))
388
389
390 class TestVIMSDN(TestFakeVim):
391 description = "Creates VIM with SDN editing SDN controllers and port_mapping"
392
393 def __init__(self):
394 TestFakeVim.__init__(self)
395
396 def run(self, engine, test_osm, manual_check):
397 engine.get_autorization()
398 # Added SDN
399 engine.test("SDN1", "Create SDN", "POST", "/admin/v1/sdns", headers_json, self.sdn, (201, 204),
400 {"Location": "/admin/v1/sdns/", "Content-Type": "application/json"}, "json")
401 sleep(5)
402 # Edit SDN
403 engine.test("SDN1", "Edit SDN", "PATCH", "/admin/v1/sdns/<SDN1>", headers_json, {"name": "new_sdn_name"},
404 (200, 201, 204), r_header_json, "json")
405 sleep(5)
406 # VIM with SDN
407 self.vim["config"]["sdn-controller"] = engine.test_ids["SDN1"]
408 self.vim["config"]["sdn-port-mapping"] = self.port_mapping
409 engine.test("VIM2", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (200, 204, 201),
410 {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"),
411
412 self.port_mapping[0]["compute_node"] = "compute node XX"
413 engine.test("VIMSDN2", "Edit VIM change port-mapping", "PUT", "/admin/v1/vim_accounts/<VIM2>", headers_json,
414 {"config": {"sdn-port-mapping": self.port_mapping}},
415 (200, 201, 204), {"Content-Type": "application/json"}, "json")
416 engine.test("VIMSDN3", "Edit VIM remove port-mapping", "PUT", "/admin/v1/vim_accounts/<VIM2>", headers_json,
417 {"config": {"sdn-port-mapping": None}},
418 (200, 201, 204), {"Content-Type": "application/json"}, "json")
419 engine.test("VIMSDN4", "Delete VIM remove port-mapping", "DELETE", "/admin/v1/vim_accounts/<VIM2>",
420 headers_json, None, (202, 201, 204), None, 0)
421 engine.test("VIMSDN5", "Delete SDN", "DELETE", "/admin/v1/sdns/<SDN1>", headers_json, None,
422 (202, 201, 204), None, 0)
423
424
425 class TestDeploy:
426 description = "Base class for downloading descriptors from ETSI, onboard and deploy in real VIM"
427
428 def __init__(self):
429 self.step = 0
430 self.nsd_id = None
431 self.vim_id = None
432 self.nsd_test = None
433 self.ns_test = None
434 self.vnfds_test = []
435 self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/"
436 self.vnfd_filenames = ("cirros_vnf.tar.gz",)
437 self.nsd_filename = "cirros_2vnf_ns.tar.gz"
438 self.uses_configuration = False
439
440 def create_descriptors(self, engine):
441 temp_dir = os.path.dirname(__file__) + "/temp/"
442 if not os.path.exists(temp_dir):
443 os.makedirs(temp_dir)
444 for vnfd_filename in self.vnfd_filenames:
445 if "/" in vnfd_filename:
446 vnfd_filename_path = vnfd_filename
447 if not os.path.exists(vnfd_filename_path):
448 raise TestException("File '{}' does not exist".format(vnfd_filename_path))
449 else:
450 vnfd_filename_path = temp_dir + vnfd_filename
451 if not os.path.exists(vnfd_filename_path):
452 with open(vnfd_filename_path, "wb") as file:
453 response = requests.get(self.descriptor_url + vnfd_filename)
454 if response.status_code >= 300:
455 raise TestException("Error downloading descriptor from '{}': {}".format(
456 self.descriptor_url + vnfd_filename, response.status_code))
457 file.write(response.content)
458 if vnfd_filename_path.endswith(".yaml"):
459 headers = headers_yaml
460 else:
461 headers = headers_zip_yaml
462 if self.step % 2 == 0:
463 # vnfd CREATE AND UPLOAD in one step:
464 engine.test("DEPLOY{}".format(self.step), "Onboard VNFD in one step", "POST",
465 "/vnfpkgm/v1/vnf_packages_content", headers, "@b" + vnfd_filename_path, 201,
466 {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}, yaml)
467 self.vnfds_test.append("DEPLOY" + str(self.step))
468 self.step += 1
469 else:
470 # vnfd CREATE AND UPLOAD ZIP
471 engine.test("DEPLOY{}".format(self.step), "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages",
472 headers_json, None, 201,
473 {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
474 self.vnfds_test.append("DEPLOY" + str(self.step))
475 self.step += 1
476 # location = r.headers["Location"]
477 # vnfd_id = location[location.rfind("/")+1:]
478 engine.test("DEPLOY{}".format(self.step), "Onboard VNFD step 2 as ZIP", "PUT",
479 "/vnfpkgm/v1/vnf_packages/<>/package_content",
480 headers, "@b" + vnfd_filename_path, 204, None, 0)
481 self.step += 2
482
483 if "/" in self.nsd_filename:
484 nsd_filename_path = self.nsd_filename
485 if not os.path.exists(nsd_filename_path):
486 raise TestException("File '{}' does not exist".format(nsd_filename_path))
487 else:
488 nsd_filename_path = temp_dir + self.nsd_filename
489 if not os.path.exists(nsd_filename_path):
490 with open(nsd_filename_path, "wb") as file:
491 response = requests.get(self.descriptor_url + self.nsd_filename)
492 if response.status_code >= 300:
493 raise TestException("Error downloading descriptor from '{}': {}".format(
494 self.descriptor_url + self.nsd_filename, response.status_code))
495 file.write(response.content)
496 if nsd_filename_path.endswith(".yaml"):
497 headers = headers_yaml
498 else:
499 headers = headers_zip_yaml
500
501 self.nsd_test = "DEPLOY" + str(self.step)
502 if self.step % 2 == 0:
503 # nsd CREATE AND UPLOAD in one step:
504 engine.test("DEPLOY{}".format(self.step), "Onboard NSD in one step", "POST",
505 "/nsd/v1/ns_descriptors_content", headers, "@b" + nsd_filename_path, 201,
506 {"Location": "/nsd/v1/ns_descriptors_content/", "Content-Type": "application/yaml"}, yaml)
507 self.step += 1
508 else:
509 # nsd CREATE AND UPLOAD ZIP
510 engine.test("DEPLOY{}".format(self.step), "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors",
511 headers_json, None, 201,
512 {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
513 self.step += 1
514 # location = r.headers["Location"]
515 # vnfd_id = location[location.rfind("/")+1:]
516 engine.test("DEPLOY{}".format(self.step), "Onboard NSD step 2 as ZIP", "PUT",
517 "/nsd/v1/ns_descriptors/<>/nsd_content",
518 headers, "@b" + nsd_filename_path, 204, None, 0)
519 self.step += 2
520 self.nsd_id = engine.test_ids[self.nsd_test]
521
522 def delete_descriptors(self, engine):
523 # delete descriptors
524 engine.test("DEPLOY{}".format(self.step), "Delete NSSD SOL005", "DELETE",
525 "/nsd/v1/ns_descriptors/<{}>".format(self.nsd_test),
526 headers_yaml, None, 204, None, 0)
527 self.step += 1
528 for vnfd_test in self.vnfds_test:
529 engine.test("DEPLOY{}".format(self.step), "Delete VNFD SOL005", "DELETE",
530 "/vnfpkgm/v1/vnf_packages/<{}>".format(vnfd_test), headers_yaml, None, 204, None, 0)
531 self.step += 1
532
533 def instantiate(self, engine, ns_data):
534 ns_data_text = yaml.safe_dump(ns_data, default_flow_style=True, width=256)
535 # create NS Two steps
536 r = engine.test("DEPLOY{}".format(self.step), "Create NS step 1", "POST", "/nslcm/v1/ns_instances",
537 headers_yaml, ns_data_text, 201,
538 {"Location": "nslcm/v1/ns_instances/", "Content-Type": "application/yaml"}, "yaml")
539 self.ns_test = "DEPLOY{}".format(self.step)
540 engine.test_ids[self.ns_test]
541 self.step += 1
542 r = engine.test("DEPLOY{}".format(self.step), "Instantiate NS step 2", "POST",
543 "/nslcm/v1/ns_instances/<{}>/instantiate".format(self.ns_test), headers_yaml, ns_data_text,
544 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml")
545 nslcmop_test = "DEPLOY{}".format(self.step)
546 self.step += 1
547
548 if test_osm:
549 # Wait until status is Ok
550 wait = timeout_configure if self.uses_configuration else timeout_deploy
551 while wait >= 0:
552 r = engine.test("DEPLOY{}".format(self.step), "Wait until NS is deployed and configured", "GET",
553 "/nslcm/v1/ns_lcm_op_occs/<{}>".format(nslcmop_test), headers_json, None,
554 200, r_header_json, "json")
555 nslcmop = r.json()
556 if "COMPLETED" in nslcmop["operationState"]:
557 break
558 elif "FAILED" in nslcmop["operationState"]:
559 raise TestException("NS instantiate has failed: {}".format(nslcmop["detailed-status"]))
560 wait -= 5
561 sleep(5)
562 else:
563 raise TestException("NS instantiate is not done after {} seconds".format(timeout_deploy))
564 self.step += 1
565
566 def _wait_nslcmop_ready(self, engine, nslcmop_test, timeout_deploy):
567 wait = timeout
568 while wait >= 0:
569 r = engine.test("DEPLOY{}".format(self.step), "Wait to ns lcm operation complete", "GET",
570 "/nslcm/v1/ns_lcm_op_occs/<{}>".format(nslcmop_test), headers_json, None,
571 200, r_header_json, "json")
572 nslcmop = r.json()
573 if "COMPLETED" in nslcmop["operationState"]:
574 break
575 elif "FAILED" in nslcmop["operationState"]:
576 raise TestException("NS terminate has failed: {}".format(nslcmop["detailed-status"]))
577 wait -= 5
578 sleep(5)
579 else:
580 raise TestException("NS instantiate is not terminate after {} seconds".format(timeout))
581
582 def terminate(self, engine):
583 # remove deployment
584 if test_osm:
585 r = engine.test("DEPLOY{}".format(self.step), "Terminate NS", "POST",
586 "/nslcm/v1/ns_instances/<{}>/terminate".format(self.ns_test), headers_yaml, None,
587 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml")
588 nslcmop2_test = "DEPLOY{}".format(self.step)
589 self.step += 1
590 # Wait until status is Ok
591 self._wait_nslcmop_ready(engine, nslcmop2_test, timeout_deploy)
592
593 r = engine.test("DEPLOY{}".format(self.step), "Delete NS", "DELETE",
594 "/nslcm/v1/ns_instances/<{}>".format(self.ns_test), headers_yaml, None,
595 204, None, 0)
596 self.step += 1
597 else:
598 r = engine.test("DEPLOY{}".format(self.step), "Delete NS with FORCE", "DELETE",
599 "/nslcm/v1/ns_instances/<{}>?FORCE=True".format(self.ns_test), headers_yaml, None,
600 204, None, 0)
601 self.step += 1
602
603 # check all it is deleted
604 r = engine.test("DEPLOY{}".format(self.step), "Check NS is deleted", "GET",
605 "/nslcm/v1/ns_instances/<{}>".format(self.ns_test), headers_yaml, None,
606 404, None, "yaml")
607 self.step += 1
608 r = engine.test("DEPLOY{}".format(self.step), "Check NSLCMOPs are deleted", "GET",
609 "/nslcm/v1/ns_lcm_op_occs?nsInstanceId=<{}>".format(self.ns_test), headers_json, None,
610 200, None, "json")
611 nslcmops = r.json()
612 if not isinstance(nslcmops, list) or nslcmops:
613 raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_test, nslcmops))
614
615 def test_ns(self, engine, test_osm):
616 pass
617
618 def aditional_operations(self, engine, test_osm, manual_check):
619 pass
620
621 def run(self, engine, test_osm, manual_check, test_params=None):
622 engine.get_autorization()
623 nsname = os.environ.get("OSMNBITEST_NS_NAME", "OSMNBITEST")
624 if test_params:
625 if "vnfd-files" in test_params:
626 self.vnfd_filenames = test_params["vnfd-files"].split(",")
627 if "nsd-file" in test_params:
628 self.nsd_filename = test_params["nsd-file"]
629 if test_params.get("ns-name"):
630 nsname = test_params["ns-name"]
631 self.create_descriptors(engine)
632
633 # create real VIM if not exist
634 self.vim_id = engine.get_create_vim(test_osm)
635 ns_data = {"nsDescription": "default description", "nsName": nsname, "nsdId": self.nsd_id,
636 "vimAccountId": self.vim_id}
637 if test_params and test_params.get("ns-config"):
638 if isinstance(test_params["ns-config"], str):
639 ns_data.update(yaml.load(test_params["ns-config"]))
640 else:
641 ns_data.update(test_params["ns-config"])
642 self.instantiate(engine, ns_data)
643
644 if manual_check:
645 input('NS has been deployed. Perform manual check and press enter to resume')
646 else:
647 self.test_ns(engine, test_osm)
648 self.aditional_operations(engine, test_osm, manual_check)
649 self.terminate(engine)
650 self.delete_descriptors(engine)
651
652
653 class TestDeployHackfestCirros(TestDeploy):
654 description = "Load and deploy Hackfest cirros_2vnf_ns example"
655
656 def __init__(self):
657 super().__init__()
658 self.vnfd_filenames = ("cirros_vnf.tar.gz",)
659 self.nsd_filename = "cirros_2vnf_ns.tar.gz"
660
661 def run(self, engine, test_osm, manual_check, test_params=None):
662 super().run(engine, test_osm, manual_check, test_params)
663
664
665 class TestDeployIpMac(TestDeploy):
666 description = "Load and deploy descriptor examples setting mac, ip address at descriptor and instantiate params"
667
668 def __init__(self):
669 super().__init__()
670 self.vnfd_filenames = ("vnfd_2vdu_set_ip_mac2.yaml", "vnfd_2vdu_set_ip_mac.yaml")
671 self.nsd_filename = "scenario_2vdu_set_ip_mac.yaml"
672 self.descriptor_url = \
673 "https://osm.etsi.org/gitweb/?p=osm/RO.git;a=blob_plain;f=test/RO_tests/v3_2vdu_set_ip_mac/"
674
675 def run(self, engine, test_osm, manual_check, test_params=None):
676 # super().run(engine, test_osm, manual_check, test_params)
677 # run again setting IPs with instantiate parameters
678 instantiation_params = {
679 "vnf": [
680 {
681 "member-vnf-index": "1",
682 "internal-vld": [
683 {
684 "name": "internal_vld1", # net_internal
685 "ip-profile": {
686 "ip-version": "ipv4",
687 "subnet-address": "10.9.8.0/24",
688 "dhcp-params": {"count": 100, "start-address": "10.9.8.100"}
689 },
690 "internal-connection-point": [
691 {
692 "id-ref": "eth2",
693 "ip-address": "10.9.8.2",
694 },
695 {
696 "id-ref": "eth3",
697 "ip-address": "10.9.8.3",
698 }
699 ]
700 },
701 ],
702
703 "vdu": [
704 {
705 "id": "VM1",
706 "interface": [
707 {
708 "name": "iface11",
709 "floating-ip-required": True,
710 },
711 {
712 "name": "iface13",
713 "mac-address": "52:33:44:55:66:13"
714 },
715 ],
716 },
717 {
718 "id": "VM2",
719 "interface": [
720 {
721 "name": "iface21",
722 "ip-address": "10.31.31.21",
723 "mac-address": "52:33:44:55:66:21"
724 },
725 ],
726 },
727 ]
728 },
729 ]
730 }
731 super().run(engine, test_osm, manual_check, test_params={"ns-config": instantiation_params})
732
733
734 class TestDeployHackfest4(TestDeploy):
735 description = "Load and deploy Hackfest 4 example."
736
737 def __init__(self):
738 super().__init__()
739 self.vnfd_filenames = ("hackfest_4_vnfd.tar.gz",)
740 self.nsd_filename = "hackfest_4_nsd.tar.gz"
741 self.uses_configuration = True
742
743 def create_descriptors(self, engine):
744 super().create_descriptors(engine)
745 # Modify VNFD to add scaling
746 payload = """
747 scaling-group-descriptor:
748 - name: "scale_dataVM"
749 max-instance-count: 10
750 scaling-policy:
751 - name: "auto_cpu_util_above_threshold"
752 scaling-type: "automatic"
753 threshold-time: 0
754 cooldown-time: 60
755 scaling-criteria:
756 - name: "cpu_util_above_threshold"
757 scale-in-threshold: 15
758 scale-in-relational-operation: "LE"
759 scale-out-threshold: 60
760 scale-out-relational-operation: "GE"
761 vnf-monitoring-param-ref: "all_aaa_cpu_util"
762 vdu:
763 - vdu-id-ref: dataVM
764 count: 1
765 scaling-config-action:
766 - trigger: post-scale-out
767 vnf-config-primitive-name-ref: touch
768 - trigger: pre-scale-in
769 vnf-config-primitive-name-ref: touch
770 vnf-configuration:
771 config-primitive:
772 - name: touch
773 parameter:
774 - name: filename
775 data-type: STRING
776 default-value: '/home/ubuntu/touched'
777 """
778 engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH",
779 "/vnfpkgm/v1/vnf_packages/<{}>".format(self.vnfds_test[0]),
780 headers_yaml, payload, 200,
781 r_header_yaml, yaml)
782 self.vnfds_test.append("DEPLOY" + str(self.step))
783 self.step += 1
784
785 def run(self, engine, test_osm, manual_check, test_params=None):
786 super().run(engine, test_osm, manual_check, test_params)
787
788
789 class TestDeployHackfest3Charmed(TestDeploy):
790 description = "Load and deploy Hackfest 3charmed_ns example. Modifies it for adding scaling and performs " \
791 "primitive actions and scaling"
792
793 def __init__(self):
794 super().__init__()
795 self.vnfd_filenames = ("hackfest_3charmed_vnfd.tar.gz",)
796 self.nsd_filename = "hackfest_3charmed_nsd.tar.gz"
797 self.uses_configuration = True
798
799 # def create_descriptors(self, engine):
800 # super().create_descriptors(engine)
801 # # Modify VNFD to add scaling
802 # payload = """
803 # scaling-group-descriptor:
804 # - name: "scale_dataVM"
805 # max-instance-count: 10
806 # scaling-policy:
807 # - name: "auto_cpu_util_above_threshold"
808 # scaling-type: "automatic"
809 # threshold-time: 0
810 # cooldown-time: 60
811 # scaling-criteria:
812 # - name: "cpu_util_above_threshold"
813 # scale-in-threshold: 15
814 # scale-in-relational-operation: "LE"
815 # scale-out-threshold: 60
816 # scale-out-relational-operation: "GE"
817 # vnf-monitoring-param-ref: "all_aaa_cpu_util"
818 # vdu:
819 # - vdu-id-ref: dataVM
820 # count: 1
821 # scaling-config-action:
822 # - trigger: post-scale-out
823 # vnf-config-primitive-name-ref: touch
824 # - trigger: pre-scale-in
825 # vnf-config-primitive-name-ref: touch
826 # vnf-configuration:
827 # config-primitive:
828 # - name: touch
829 # parameter:
830 # - name: filename
831 # data-type: STRING
832 # default-value: '/home/ubuntu/touched'
833 # """
834 # engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH",
835 # "/vnfpkgm/v1/vnf_packages/<{}>".format(self.vnfds_test[0]),
836 # headers_yaml, payload, 200,
837 # r_header_yaml, yaml)
838 # self.vnfds_test.append("DEPLOY" + str(self.step))
839 # self.step += 1
840
841 def aditional_operations(self, engine, test_osm, manual_check):
842 if not test_osm:
843 return
844 # 1 perform action
845 payload = '{member_vnf_index: "2", primitive: touch, primitive_params: { filename: /home/ubuntu/OSMTESTNBI }}'
846 engine.test("DEPLOY{}".format(self.step), "Executer service primitive over NS", "POST",
847 "/nslcm/v1/ns_instances/<{}>/action".format(self.ns_test), headers_yaml, payload,
848 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml")
849 nslcmop2_action = "DEPLOY{}".format(self.step)
850 self.step += 1
851 # Wait until status is Ok
852 self._wait_nslcmop_ready(engine, nslcmop2_action, timeout_deploy)
853 if manual_check:
854 input('NS service primitive has been executed. Check that file /home/ubuntu/OSMTESTNBI is present at '
855 'TODO_PUT_IP')
856 # TODO check automatic
857
858 # # 2 perform scale out
859 # payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \
860 # '{scaling-group-descriptor: scale_dataVM, member-vnf-index: "1"}}}'
861 # engine.test("DEPLOY{}".format(self.step), "Execute scale action over NS", "POST",
862 # "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload,
863 # 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml")
864 # nslcmop2_scale_out = "DEPLOY{}".format(self.step)
865 # self._wait_nslcmop_ready(engine, nslcmop2_scale_out, timeout_deploy)
866 # if manual_check:
867 # input('NS scale out done. Check that file /home/ubuntu/touched is present and new VM is created')
868 # # TODO check automatic
869 #
870 # # 2 perform scale in
871 # payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_IN, scaleByStepData: ' \
872 # '{scaling-group-descriptor: scale_dataVM, member-vnf-index: "1"}}}'
873 # engine.test("DEPLOY{}".format(self.step), "Execute scale action over NS", "POST",
874 # "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload,
875 # 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml")
876 # nslcmop2_scale_in = "DEPLOY{}".format(self.step)
877 # self._wait_nslcmop_ready(engine, nslcmop2_scale_in, timeout_deploy)
878 # if manual_check:
879 # input('NS scale in done. Check that file /home/ubuntu/touched is updated and new VM is deleted')
880 # # TODO check automatic
881
882 def run(self, engine, test_osm, manual_check, test_params=None):
883 super().run(engine, test_osm, manual_check, test_params)
884
885
886 if __name__ == "__main__":
887 global logger
888 test = ""
889
890 # Disable warnings from self-signed certificates.
891 requests.packages.urllib3.disable_warnings()
892 try:
893 logging.basicConfig(format="%(levelname)s %(message)s", level=logging.ERROR)
894 logger = logging.getLogger('NBI')
895 # load parameters and configuration
896 opts, args = getopt.getopt(sys.argv[1:], "hvu:p:",
897 ["url=", "user=", "password=", "help", "version", "verbose", "no-verbose",
898 "project=", "insecure", "timeout", "timeout-deploy", "timeout-configure",
899 "test=", "list", "test-osm", "manual-check", "params="])
900 url = "https://localhost:9999/osm"
901 user = password = project = "admin"
902 test_osm = False
903 manual_check = False
904 verbose = 0
905 verify = True
906 test_classes = {
907 "NonAuthorized": TestNonAuthorized,
908 "FakeVIM": TestFakeVim,
909 "VIM-SDN": TestVIMSDN,
910 "Deploy-Custom": TestDeploy,
911 "Deploy-Hackfest-Cirros": TestDeployHackfestCirros,
912 "Deploy-Hackfest-3Charmed": TestDeployHackfest3Charmed,
913 "Deploy-Hackfest-4": TestDeployHackfest4,
914 "Deploy-CirrosMacIp": TestDeployIpMac,
915 }
916 test_to_do = []
917 test_params = {}
918
919 for o, a in opts:
920 # print("parameter:", o, a)
921 if o == "--version":
922 print("test version " + __version__ + ' ' + version_date)
923 exit()
924 elif o == "--list":
925 for test, test_class in test_classes.items():
926 print("{:20} {}".format(test + ":", test_class.description))
927 exit()
928 elif o in ("-v", "--verbose"):
929 verbose += 1
930 elif o == "no-verbose":
931 verbose = -1
932 elif o in ("-h", "--help"):
933 usage()
934 sys.exit()
935 elif o == "--test-osm":
936 test_osm = True
937 elif o == "--manual-check":
938 manual_check = True
939 elif o == "--url":
940 url = a
941 elif o in ("-u", "--user"):
942 user = a
943 elif o in ("-p", "--password"):
944 password = a
945 elif o == "--project":
946 project = a
947 elif o == "--test":
948 # print("asdfadf", o, a, a.split(","))
949 for _test in a.split(","):
950 if _test not in test_classes:
951 print("Invalid test name '{}'. Use option '--list' to show available tests".format(_test),
952 file=sys.stderr)
953 exit(1)
954 test_to_do.append(_test)
955 elif o == "--params":
956 param_key, _, param_value = a.partition("=")
957 text_index = len(test_to_do)
958 if text_index not in test_params:
959 test_params[text_index] = {}
960 test_params[text_index][param_key] = param_value
961 elif o == "--insecure":
962 verify = False
963 elif o == "--timeout":
964 timeout = int(a)
965 elif o == "--timeout-deploy":
966 timeout_deploy = int(a)
967 elif o == "--timeout-configure":
968 timeout_configure = int(a)
969 else:
970 assert False, "Unhandled option"
971 if verbose == 0:
972 logger.setLevel(logging.WARNING)
973 elif verbose > 1:
974 logger.setLevel(logging.DEBUG)
975 else:
976 logger.setLevel(logging.ERROR)
977
978 test_rest = TestRest(url, user=user, password=password, project=project)
979 # print("tests to do:", test_to_do)
980 if test_to_do:
981 text_index = 0
982 for test in test_to_do:
983 text_index += 1
984 test_class = test_classes[test]
985 test_class().run(test_rest, test_osm, manual_check, test_params.get(text_index))
986 else:
987 for test, test_class in test_classes.items():
988 test_class().run(test_rest, test_osm, manual_check, test_params.get(0))
989 exit(0)
990
991 # get token
992
993 # # tests once authorized
994 # for t in test_authorized_list:
995 # test_rest.test(*t)
996 #
997 # # tests admin
998 # for t in test_admin_list1:
999 # test_rest.test(*t)
1000 #
1001 # # vnfd CREATE
1002 # r = test_rest.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None,
1003 # 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
1004 # location = r.headers["Location"]
1005 # vnfd_id = location[location.rfind("/")+1:]
1006 # # print(location, vnfd_id)
1007 #
1008 # # vnfd UPLOAD test
1009 # r = test_rest.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT",
1010 # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
1011 # r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
1012 #
1013 # # vnfd SHOW OSM format
1014 # r = test_rest.test("VNFD3", "Show VNFD OSM format", "GET",
1015 # "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
1016 # headers_json, None, 200, r_header_json, "json")
1017 #
1018 # # vnfd SHOW text
1019 # r = test_rest.test("VNFD4", "Show VNFD SOL005 text", "GET",
1020 # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
1021 # headers_text, None, 200, r_header_text, "text")
1022 #
1023 # # vnfd UPLOAD ZIP
1024 # makedirs("temp", exist_ok=True)
1025 # tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz")
1026 # tar.add("cirros_vnf")
1027 # tar.close()
1028 # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
1029 # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
1030 # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
1031 #
1032 # # vnfd SHOW OSM format
1033 # r = test_rest.test("VNFD6", "Show VNFD OSM format", "GET",
1034 # "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
1035 # headers_json, None, 200, r_header_json, "json")
1036 #
1037 # # vnfd SHOW zip
1038 # r = test_rest.test("VNFD7", "Show VNFD SOL005 zip", "GET",
1039 # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
1040 # headers_zip, None, 200, r_header_zip, "zip")
1041 # # vnfd SHOW descriptor
1042 # r = test_rest.test("VNFD8", "Show VNFD descriptor", "GET",
1043 # "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id),
1044 # headers_text, None, 200, r_header_text, "text")
1045 # # vnfd SHOW actifact
1046 # r = test_rest.test("VNFD9", "Show VNFD artifact", "GET",
1047 # "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id),
1048 # headers_text, None, 200, r_header_octect, "text")
1049 #
1050 # # # vnfd DELETE
1051 # # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE",
1052 # # "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
1053 # # headers_yaml, None, 204, None, 0)
1054 #
1055 # # nsd CREATE
1056 # r = test_rest.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None,
1057 # 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
1058 # location = r.headers["Location"]
1059 # nsd_id = location[location.rfind("/")+1:]
1060 # # print(location, nsd_id)
1061 #
1062 # # nsd UPLOAD test
1063 # r = test_rest.test("NSD2", "Onboard NSD with missing vnfd", "PUT",
1064 # "/nsd/v1/ns_descriptors/<>/nsd_content?constituent-vnfd.0.vnfd-id-ref"
1065 # "=NONEXISTING-VNFD".format(nsd_id),
1066 # r_header_text, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml, "yaml")
1067 #
1068 # # # VNF_CREATE
1069 # # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
1070 # # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
1071 # # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
1072 #
1073 # r = test_rest.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT",
1074 # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
1075 # r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
1076 #
1077 # # nsd SHOW OSM format
1078 # r = test_rest.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
1079 # headers_json, None, 200, r_header_json, "json")
1080 #
1081 # # nsd SHOW text
1082 # r = test_rest.test("NSD4", "Show NSD SOL005 text", "GET",
1083 # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
1084 # headers_text, None, 200, r_header_text, "text")
1085 #
1086 # # nsd UPLOAD ZIP
1087 # makedirs("temp", exist_ok=True)
1088 # tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz")
1089 # tar.add("cirros_ns")
1090 # tar.close()
1091 # r = test_rest.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT",
1092 # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
1093 # r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
1094 #
1095 # # nsd SHOW OSM format
1096 # r = test_rest.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
1097 # headers_json, None, 200, r_header_json, "json")
1098 #
1099 # # nsd SHOW zip
1100 # r = test_rest.test("NSD7","Show NSD SOL005 zip","GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
1101 # headers_zip, None, 200, r_header_zip, "zip")
1102 #
1103 # # nsd SHOW descriptor
1104 # r = test_rest.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id),
1105 # headers_text, None, 200, r_header_text, "text")
1106 # # nsd SHOW actifact
1107 # r = test_rest.test("NSD9", "Show NSD artifact", "GET",
1108 # "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id),
1109 # headers_text, None, 200, r_header_octect, "text")
1110 #
1111 # # vnfd DELETE
1112 # r = test_rest.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
1113 # headers_yaml, None, 409, r_header_yaml, "yaml")
1114 #
1115 # # nsd DELETE
1116 # r = test_rest.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id),
1117 # headers_yaml, None, 204, None, 0)
1118 #
1119 # # vnfd DELETE
1120 # r = test_rest.test("VNFD10","Delete VNFD SOL005 text","DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
1121 # headers_yaml, None, 204, None, 0)
1122 print("PASS")
1123
1124 except TestException as e:
1125 logger.error(test + "Test {} Exception: {}".format(test, str(e)))
1126 exit(1)
1127 except getopt.GetoptError as e:
1128 logger.error(e)
1129 print(e, file=sys.stderr)
1130 exit(1)
1131 except Exception as e:
1132 logger.critical(test + " Exception: " + str(e), exc_info=True)