from random import choice as random_choice
from uuid import uuid4
from hashlib import sha256, md5
-from osm_common.dbbase import DbException
+from osm_common.dbbase import DbException, deep_update
from osm_common.fsbase import FsException
from osm_common.msgbase import MsgException
from http import HTTPStatus
Exception.__init__(self, message)
-def _deep_update(dict_to_change, dict_reference):
- """
- Modifies one dictionary with the information of the other following https://tools.ietf.org/html/rfc7396
- :param dict_to_change: Ends modified
- :param dict_reference: reference
- :return: none
- """
- for k in dict_reference:
- if dict_reference[k] is None: # None->Anything
- if k in dict_to_change:
- del dict_to_change[k]
- elif not isinstance(dict_reference[k], dict): # NotDict->Anything
- dict_to_change[k] = dict_reference[k]
- elif k not in dict_to_change: # Dict->Empty
- dict_to_change[k] = deepcopy(dict_reference[k])
- _deep_update(dict_to_change[k], dict_reference[k])
- elif isinstance(dict_to_change[k], dict): # Dict->Dict
- _deep_update(dict_to_change[k], dict_reference[k])
- else: # Dict->NotDict
- dict_to_change[k] = deepcopy(dict_reference[k])
- _deep_update(dict_to_change[k], dict_reference[k])
-
-
def get_iterable(input):
"""
Returns an iterable, in case input is None it just returns an empty tuple
clean_indata = clean_indata['userDefinedData']
return clean_indata
+ def _check_project_dependencies(self, project_id):
+ """
+ Check if a project can be deleted
+ :param session:
+ :param _id:
+ :return:
+ """
+ # TODO Is it needed to check descriptors _admin.project_read/project_write??
+ _filter = {"projects": project_id}
+ if self.db.get_list("users", _filter):
+ raise EngineException("There are users that uses this project", http_code=HTTPStatus.CONFLICT)
+
def _check_dependencies_on_descriptor(self, session, item, descriptor_id, _id):
"""
Check that the descriptor to be deleded is not a dependency of others
raise EngineException("Descriptor error at nsdId='{}' references a non exist nsd".format(nsd_id),
http_code=HTTPStatus.CONFLICT)
+ def _check_edition(self, session, item, indata, id, force=False):
+ if item == "users":
+ if indata.get("projects"):
+ if not session["admin"]:
+ raise EngineException("Needed admin privileges to edit user projects", HTTPStatus.UNAUTHORIZED)
+ if indata.get("password"):
+ # regenerate salt and encrypt password
+ salt = uuid4().hex
+ indata["_admin"] = {"salt": salt}
+ indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest()
+
def _validate_new_data(self, session, item, indata, id=None, force=False):
if item == "users":
- if not indata.get("username"):
- raise EngineException("missing 'username'", HTTPStatus.UNPROCESSABLE_ENTITY)
- if not indata.get("password"):
- raise EngineException("missing 'password'", HTTPStatus.UNPROCESSABLE_ENTITY)
- if not indata.get("projects"):
- raise EngineException("missing 'projects'", HTTPStatus.UNPROCESSABLE_ENTITY)
# check username not exists
- if self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False, fail_on_more=False):
+ if not id and self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False,
+ fail_on_more=False):
raise EngineException("username '{}' exists".format(indata["username"]), HTTPStatus.CONFLICT)
+ # check projects
+ if not force:
+ for p in indata["projects"]:
+ if p == "admin":
+ continue
+ if not self.db.get_one("projects", {"_id": p}, fail_on_empty=False, fail_on_more=False):
+ raise EngineException("project '{}' does not exists".format(p), HTTPStatus.CONFLICT)
elif item == "projects":
if not indata.get("name"):
raise EngineException("missing 'name'")
# check name not exists
- if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
+ if not id and self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
raise EngineException("name '{}' exists".format(indata["name"]), HTTPStatus.CONFLICT)
elif item in ("vnfds", "nsds"):
filter = {"id": indata["id"]}
:return: _id: identity of the inserted data.
"""
+ if not session["admin"] and item in ("users", "projects"):
+ raise EngineException("Needed admin privileges to perform this operation", HTTPStatus.UNAUTHORIZED)
+
try:
item_envelop = item
if item in ("nsds", "vnfds"):
# Override descriptor with query string kwargs
self._update_descriptor(content, kwargs)
- if not indata and item not in ("nsds", "vnfds"):
+ if not content and item not in ("nsds", "vnfds"):
raise EngineException("Empty payload")
validate_input(content, item, new=True)
# in this case the input descriptor is not the data to be stored
return self.new_nsr(rollback, session, ns_request=content)
- self._validate_new_data(session, item_envelop, content, force)
+ self._validate_new_data(session, item_envelop, content, force=force)
if item in ("nsds", "vnfds"):
content = {"_admin": {"userDefinedData": content}}
self._format_new_data(session, item, content)
# raise EngineException("Cannot get ns_instance '{}': {}".format(e), HTTPStatus.NOT_FOUND)
def _add_read_filter(self, session, item, filter):
- if session["project_id"] == "admin": # allows all
+ if session["admin"]: # allows all
return filter
if item == "users":
filter["username"] = session["username"]
filter["_admin.projects_read.cont"] = ["ANY", session["project_id"]]
def _add_delete_filter(self, session, item, filter):
- if session["project_id"] != "admin" and item in ("users", "projects"):
+ if not session["admin"] and item in ("users", "projects"):
raise EngineException("Only admin users can perform this task", http_code=HTTPStatus.FORBIDDEN)
if item == "users":
if filter.get("_id") == session["username"] or filter.get("username") == session["username"]:
elif item == "project":
if filter.get("_id") == session["project_id"]:
raise EngineException("You cannot delete your own project", http_code=HTTPStatus.CONFLICT)
- elif item in ("vnfds", "nsds") and session["project_id"] != "admin":
+ elif item in ("vnfds", "nsds") and not session["admin"]:
filter["_admin.projects_write.cont"] = ["ANY", session["project_id"]]
def get_file(self, session, item, _id, path=None, accept_header=None):
descriptor_id = descriptor.get("id")
if descriptor_id:
self._check_dependencies_on_descriptor(session, item, descriptor_id, _id)
+ elif item == "projects":
+ if not force:
+ self._check_project_dependencies(_id)
if item == "nsrs":
nsr = self.db.get_one(item, filter)
except ValidationError as e:
raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
- _deep_update(content, indata)
- self._validate_new_data(session, item, content, id, force)
+ self._check_edition(session, item, indata, id, force)
+ deep_update(content, indata)
+ self._validate_new_data(session, item, content, id=id, force=force)
# self._format_new_data(session, item, content)
self.db.replace(item, id, content)
if item in ("vim_accounts", "sdns"):
:param force: If True avoid some dependence checks
:return: dictionary, raise exception if not found.
"""
+ if not session["admin"] and item == "projects":
+ raise EngineException("Needed admin privileges to perform this operation", HTTPStatus.UNAUTHORIZED)
content = self.get_item(session, item, _id)
return self._edit_item(session, item, _id, content, indata, kwargs, force)
def set_header(self, header):
self.s.headers.update(header)
+ def unset_header(self, key):
+ if key in self.s.headers:
+ del self.s.headers[key]
+
def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers,
expected_payload):
"""
# self.project = project
r = self.test("TOKEN", "Obtain token", "POST", "/admin/v1/tokens", headers_json,
{"username": self.user, "password": self.password, "project_id": self.project},
- (200, 201), {"Content-Type": "application/json"}, "json")
+ (200, 201), r_header_json, "json")
response = r.json()
self.token = response["id"]
self.set_header({"Authorization": "Bearer {}".format(self.token)})
+ def remove_authorization(self):
+ if self.token:
+ self.test("TOKEN_DEL", "Delete token", "DELETE", "/admin/v1/tokens/{}".format(self.token), headers_json,
+ None, (200, 201, 204), None, None)
+ self.token = None
+ self.unset_header("Authorization")
+
def get_create_vim(self, test_osm):
if self.vim_id:
return self.vim_id
description = "test invalid URLs. methods and no authorization"
@staticmethod
- def run(engine, test_osm, manual_check):
+ def run(engine, test_osm, manual_check, test_params=None):
+ engine.remove_authorization()
test_not_authorized_list = (
("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"),
("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"),
engine.test(*t)
+class TestUsersProjects:
+ description = "test project and user creation"
+
+ @staticmethod
+ def run(engine, test_osm, manual_check, test_params=None):
+ engine.get_autorization()
+ engine.test("PU1", "Create project non admin", "POST", "/admin/v1/projects", headers_json, {"name": "P1"},
+ (201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
+ engine.test("PU2", "Create project admin", "POST", "/admin/v1/projects", headers_json,
+ {"name": "Padmin", "admin": True}, (201, 204),
+ {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
+ engine.test("PU3", "Create project bad format", "POST", "/admin/v1/projects", headers_json, {"name": 1}, 422,
+ r_header_json, "json")
+ engine.test("PU4", "Create user with bad project", "POST", "/admin/v1/users", headers_json,
+ {"username": "U1", "projects": ["P1", "P2", "Padmin"], "password": "pw1"}, 409,
+ r_header_json, "json")
+ engine.test("PU5", "Create user with bad project and force", "POST", "/admin/v1/users?FORCE=True", headers_json,
+ {"username": "U1", "projects": ["P1", "P2", "Padmin"], "password": "pw1"}, 201,
+ {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json")
+ engine.test("PU6", "Create user 2", "POST", "/admin/v1/users", headers_json,
+ {"username": "U2", "projects": ["P1"], "password": "pw2"}, 201,
+ {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json")
+
+ engine.test("PU7", "Edit user U1, delete P2 project", "PATCH", "/admin/v1/users/U1", headers_json,
+ {"projects": {"$'P2'": None}}, 204, None, None)
+ res = engine.test("PU1", "Check user U1, contains the right projects", "GET", "/admin/v1/users/U1",
+ headers_json, None, 200, None, json)
+ u1 = res.json()
+ # print(u1)
+ expected_projects = ["P1", "Padmin"]
+ if u1["projects"] != expected_projects:
+ raise TestException("User content projects '{}' different than expected '{}'. Edition has not done"
+ " properly".format(u1["projects"], expected_projects))
+
+ engine.test("PU8", "Edit user U1, set Padmin as default project", "PUT", "/admin/v1/users/U1", headers_json,
+ {"projects": {"$'Padmin'": None, "$+[0]": "Padmin"}}, 204, None, None)
+ res = engine.test("PU1", "Check user U1, contains the right projects", "GET", "/admin/v1/users/U1",
+ headers_json, None, 200, None, json)
+ u1 = res.json()
+ # print(u1)
+ expected_projects = ["Padmin", "P1"]
+ if u1["projects"] != expected_projects:
+ raise TestException("User content projects '{}' different than expected '{}'. Edition has not done"
+ " properly".format(u1["projects"], expected_projects))
+
+ engine.test("PU9", "Edit user U1, change password", "PATCH", "/admin/v1/users/U1", headers_json,
+ {"password": "pw1_new"}, 204, None, None)
+
+ engine.test("PU10", "Change to project P1 non existing", "POST", "/admin/v1/tokens/", headers_json,
+ {"project_id": "P1"}, 401, r_header_json, "json")
+
+ res = engine.test("PU1", "Change to user U1 project P1", "POST", "/admin/v1/tokens", headers_json,
+ {"username": "U1", "password": "pw1_new", "project_id": "P1"}, (200, 201),
+ r_header_json, "json")
+ response = res.json()
+ engine.set_header({"Authorization": "Bearer {}".format(response["id"])})
+
+ engine.test("PU11", "Edit user projects non admin", "PUT", "/admin/v1/users/U1", headers_json,
+ {"projects": {"$'P1'": None}}, 401, r_header_json, "json")
+ engine.test("PU12", "Add new project non admin", "POST", "/admin/v1/projects", headers_json,
+ {"name": "P2"}, 401, r_header_json, "json")
+ engine.test("PU13", "Add new user non admin", "POST", "/admin/v1/users", headers_json,
+ {"username": "U3", "projects": ["P1"], "password": "pw3"}, 401,
+ r_header_json, "json")
+
+ res = engine.test("PU14", "Change to user U1 project Padmin", "POST", "/admin/v1/tokens", headers_json,
+ {"project_id": "Padmin"}, (200, 201), r_header_json, "json")
+ response = res.json()
+ engine.set_header({"Authorization": "Bearer {}".format(response["id"])})
+
+ engine.test("PU15", "Add new project admin", "POST", "/admin/v1/projects", headers_json, {"name": "P2"},
+ (201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
+ engine.test("PU16", "Add new user U3 admin", "POST", "/admin/v1/users",
+ headers_json, {"username": "U3", "projects": ["P2"], "password": "pw3"}, (201, 204),
+ {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json")
+ engine.test("PU17", "Edit user projects admin", "PUT", "/admin/v1/users/U3", headers_json,
+ {"projects": ["P2"]}, 204, None, None)
+
+ engine.test("PU18", "Delete project P2 conflict", "DELETE", "/admin/v1/projects/P2", headers_json, None, 409,
+ r_header_json, "json")
+ engine.test("PU19", "Delete project P2 forcing", "DELETE", "/admin/v1/projects/P2?FORCE=True", headers_json,
+ None, 204, None, None)
+
+ engine.test("PU20", "Delete user U1. Conflict deleting own user", "DELETE", "/admin/v1/users/U1", headers_json,
+ None, 409, r_header_json, "json")
+ engine.test("PU21", "Delete user U2", "DELETE", "/admin/v1/users/U2", headers_json, None, 204, None, None)
+ engine.test("PU22", "Delete user U3", "DELETE", "/admin/v1/users/U3", headers_json, None, 204, None, None)
+ # change to admin
+ engine.remove_authorization() # To force get authorization
+ engine.get_autorization()
+ engine.test("PU23", "Delete user U1", "DELETE", "/admin/v1/users/U1", headers_json, None, 204, None, None)
+ engine.test("PU24", "Delete project P1", "DELETE", "/admin/v1/projects/P1", headers_json, None, 204, None, None)
+ engine.test("PU25", "Delete project Padmin", "DELETE", "/admin/v1/projects/Padmin", headers_json, None, 204,
+ None, None)
+
+
class TestFakeVim:
description = "Creates/edit/delete fake VIMs and SDN controllers"
]}
]
- def run(self, engine, test_osm, manual_check):
+ def run(self, engine, test_osm, manual_check, test_params=None):
vim_bad = self.vim.copy()
vim_bad.pop("name")
def __init__(self):
TestFakeVim.__init__(self)
- def run(self, engine, test_osm, manual_check):
+ def run(self, engine, test_osm, manual_check, test_params=None):
engine.get_autorization()
# Added SDN
- engine.test("SDN1", "Create SDN", "POST", "/admin/v1/sdns", headers_json, self.sdn, (201, 204),
+ engine.test("VIMSDN1", "Create SDN", "POST", "/admin/v1/sdns", headers_json, self.sdn, (201, 204),
{"Location": "/admin/v1/sdns/", "Content-Type": "application/json"}, "json")
- sleep(5)
+ # sleep(5)
# Edit SDN
- engine.test("SDN1", "Edit SDN", "PATCH", "/admin/v1/sdns/<SDN1>", headers_json, {"name": "new_sdn_name"},
- (200, 201, 204), r_header_json, "json")
- sleep(5)
+ engine.test("VIMSDN2", "Edit SDN", "PATCH", "/admin/v1/sdns/<VIMSDN1>", headers_json, {"name": "new_sdn_name"},
+ 204, None, None)
+ # sleep(5)
# VIM with SDN
- self.vim["config"]["sdn-controller"] = engine.test_ids["SDN1"]
+ self.vim["config"]["sdn-controller"] = engine.test_ids["VIMSDN1"]
self.vim["config"]["sdn-port-mapping"] = self.port_mapping
- engine.test("VIM2", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (200, 204, 201),
+ engine.test("VIMSDN3", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (200, 204, 201),
{"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"),
self.port_mapping[0]["compute_node"] = "compute node XX"
- engine.test("VIMSDN2", "Edit VIM change port-mapping", "PUT", "/admin/v1/vim_accounts/<VIM2>", headers_json,
- {"config": {"sdn-port-mapping": self.port_mapping}},
- (200, 201, 204), {"Content-Type": "application/json"}, "json")
- engine.test("VIMSDN3", "Edit VIM remove port-mapping", "PUT", "/admin/v1/vim_accounts/<VIM2>", headers_json,
- {"config": {"sdn-port-mapping": None}},
- (200, 201, 204), {"Content-Type": "application/json"}, "json")
- engine.test("VIMSDN4", "Delete VIM remove port-mapping", "DELETE", "/admin/v1/vim_accounts/<VIM2>",
- headers_json, None, (202, 201, 204), None, 0)
- engine.test("VIMSDN5", "Delete SDN", "DELETE", "/admin/v1/sdns/<SDN1>", headers_json, None,
- (202, 201, 204), None, 0)
+ engine.test("VIMSDN4", "Edit VIM change port-mapping", "PUT", "/admin/v1/vim_accounts/<VIMSDN3>", headers_json,
+ {"config": {"sdn-port-mapping": self.port_mapping}}, 204, None, None)
+ engine.test("VIMSDN5", "Edit VIM remove port-mapping", "PUT", "/admin/v1/vim_accounts/<VIMSDN3>", headers_json,
+ {"config": {"sdn-port-mapping": None}}, 204, None, None)
+
+ if not test_osm:
+ # delete with FORCE
+ engine.test("VIMSDN6", "Delete VIM remove port-mapping", "DELETE",
+ "/admin/v1/vim_accounts/<VIMSDN3>?FORCE=True", headers_json, None, 202, None, 0)
+ engine.test("VIMSDN7", "Delete SDNC", "DELETE", "/admin/v1/sdns/<VIMSDN1>?FORCE=True", headers_json, None,
+ 202, None, 0)
+
+ engine.test("VIMSDN8", "Check VIM is deleted", "GET", "/admin/v1/vim_accounts/<VIMSDN3>", headers_yaml,
+ None, 404, r_header_yaml, "yaml")
+ engine.test("VIMSDN9", "Check SDN is deleted", "GET", "/admin/v1/sdns/<VIMSDN1>", headers_yaml, None,
+ 404, r_header_yaml, "yaml")
+ else:
+ # delete and wait until is really deleted
+ engine.test("VIMSDN6", "Delete VIM remove port-mapping", "DELETE", "/admin/v1/vim_accounts/<VIMSDN3>",
+ headers_json, None, (202, 201, 204), None, 0)
+ engine.test("VIMSDN7", "Delete SDN", "DELETE", "/admin/v1/sdns/<VIMSDN1>", headers_json, None,
+ (202, 201, 204), None, 0)
+ wait = timeout
+ while wait >= 0:
+ r = engine.test("VIMSDN8", "Check VIM is deleted", "GET", "/admin/v1/vim_accounts/<VIMSDN3>",
+ headers_yaml, None, None, r_header_yaml, "yaml")
+ if r.status_code == 404:
+ break
+ elif r.status_code == 200:
+ wait -= 5
+ sleep(5)
+ else:
+ raise TestException("Vim created at 'VIMSDN3' is not delete after {} seconds".format(timeout))
+ while wait >= 0:
+ r = engine.test("VIMSDN9", "Check SDNC is deleted", "GET", "/admin/v1/sdns/<VIMSDN1>",
+ headers_yaml, None, None, r_header_yaml, "yaml")
+ if r.status_code == 404:
+ break
+ elif r.status_code == 200:
+ wait -= 5
+ sleep(5)
+ else:
+ raise TestException("SDNC created at 'VIMSDN1' is not delete after {} seconds".format(timeout))
class TestDeploy:
default-value: '/home/ubuntu/touched'
"""
engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH",
- "/vnfpkgm/v1/vnf_packages/<{}>".format(self.vnfds_test[0]),
- headers_yaml, payload, 200,
- r_header_yaml, yaml)
- self.vnfds_test.append("DEPLOY" + str(self.step))
+ "/vnfpkgm/v1/vnf_packages/<{}>".format(self.vnfds_test[0]), headers_yaml, payload, 204, None, None)
self.step += 1
def run(self, engine, test_osm, manual_check, test_params=None):
test_classes = {
"NonAuthorized": TestNonAuthorized,
"FakeVIM": TestFakeVim,
+ "TestUsersProjects": TestUsersProjects,
"VIM-SDN": TestVIMSDN,
"Deploy-Custom": TestDeploy,
"Deploy-Hackfest-Cirros": TestDeployHackfestCirros,
# Basis schemas
patern_name = "^[ -~]+$"
+nameshort_schema = {"type": "string", "minLength": 1, "maxLength": 60, "pattern": "^[^,;()\.\$'\"]+$"}
passwd_schema = {"type": "string", "minLength": 1, "maxLength": 60}
-nameshort_schema = {"type": "string", "minLength": 1, "maxLength": 60, "pattern": "^[^,;()'\"]+$"}
name_schema = {"type": "string", "minLength": 1, "maxLength": 255, "pattern": "^[^,;()'\"]+$"}
string_schema = {"type": "string", "minLength": 1, "maxLength": 255}
xml_text_schema = {"type": "string", "minLength": 1, "maxLength": 1000, "pattern": "^[^']+$"}
log_level_schema = {"type": "string", "enum": ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]}
checksum_schema = {"type": "string", "pattern": "^[0-9a-fA-F]{32}$"}
size_schema = {"type": "integer", "minimum": 1, "maximum": 100}
+array_edition_schema = {
+ "type": "object",
+ "patternProperties": {
+ "^\$": "Any"
+ },
+ "additionalProperties": False,
+ "minProperties": 1,
+}
+
ns_instantiate_vdu = {
"title": "ns action instantiate input schema for vdu",
"vim_tenant": name_schema,
"vim_tenant_name": name_schema,
"vim_username": nameshort_schema,
- "vim_password": nameshort_schema,
+ "vim_password": passwd_schema,
"config": {"type": "object"}
},
"additionalProperties": False
# "vim_tenant": name_schema,
"vim_tenant_name": name_schema,
"vim_user": nameshort_schema,
- "vim_password": nameshort_schema,
+ "vim_password": passwd_schema,
"config": {"type": "object"}
},
"required": ["name", "vim_url", "vim_type", "vim_user", "vim_password", "vim_tenant_name"],
}
sdn_external_port_schema = {
"$schema": "http://json-schema.org/draft-04/schema#",
- "title": "External port ingformation",
+ "title": "External port information",
"type": "object",
"properties": {
"port": {"type": "string", "minLength": 1, "maxLength": 60},
"required": ["port"]
}
+# USERS
+user_project_schema = {
+ "type": "array",
+ "minItems": 1,
+ "items": nameshort_schema,
+}
+user_new_schema = {
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "title": "New user schema",
+ "type": "object",
+ "properties": {
+ "username": nameshort_schema,
+ "password": passwd_schema,
+ "projects": user_project_schema,
+ },
+ "required": ["username", "password", "projects"],
+ "additionalProperties": False
+}
+user_edit_schema = {
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "title": "User edit schema for administrators",
+ "type": "object",
+ "properties": {
+ "password": passwd_schema,
+ "projects": {
+ "oneOff": [
+ user_project_schema,
+ array_edition_schema
+ ]
+ },
+ }
+}
+
+# PROJECTS
+project_new_schema = {
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "title": "New project schema for administrators",
+ "type": "object",
+ "properties": {
+ "name": nameshort_schema,
+ "admin": bool_schema,
+ },
+ "required": ["name"],
+ "additionalProperties": False
+}
+project_edit_schema = {
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "title": "Project edit schema for administrators",
+ "type": "object",
+ "properties": {
+ "admin": bool_schema,
+ },
+ "additionalProperties": False,
+ "minProperties": 1
+}
+
+# GLOBAL SCHEMAS
nbi_new_input_schemas = {
+ "users": user_new_schema,
+ "projects": project_new_schema,
"vim_accounts": vim_account_new_schema,
"sdns": sdn_new_schema,
"ns_instantiate": ns_instantiate,
}
nbi_edit_input_schemas = {
+ "users": user_edit_schema,
+ "projects": project_edit_schema,
"vim_accounts": vim_account_edit_schema,
"sdns": sdn_edit_schema
}
def validate_input(indata, item, new=True):
"""
- Validates input data agains json schema
+ Validates input data against json schema
:param indata: user input data. Should be a dictionary
:param item: can be users, projects, vims, sdns, ns_xxxxx
:param new: True if the validation is for creating or False if it is for editing