from validation import wim_account_new_schema, wim_account_edit_schema, roles_new_schema, roles_edit_schema
from validation import validate_input
from validation import ValidationError
+from validation import is_valid_uuid # To check that User/Project Names don't look like UUIDs
from base_topic import BaseTopic, EngineException
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
for p in indata["projects"]:
if p == "admin":
continue
- if not self.db.get_one("projects", {"_id": p}, fail_on_empty=False, fail_on_more=False):
- raise EngineException("project '{}' does not exists".format(p), HTTPStatus.CONFLICT)
+ # To allow project addressing by Name as well as ID
+ if not self.db.get_one("projects", {BaseTopic.id_field("projects", p): p}, fail_on_empty=False,
+ fail_on_more=False):
+ raise EngineException("project '{}' does not exist".format(p), HTTPStatus.CONFLICT)
def check_conflict_on_del(self, session, _id, force=False):
if _id == session["username"]:
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
BaseTopic.format_on_new(content, make_public=False)
- content["_id"] = content["username"]
+ # Removed so that the UUID is kept, to allow User Name modification
+ # content["_id"] = content["username"]
salt = uuid4().hex
content["_admin"]["salt"] = salt
if content.get("password"):
def edit(self, session, _id, indata=None, kwargs=None, force=False, content=None):
if not session["admin"]:
raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+ # Names that look like UUIDs are not allowed
+ name = (indata if indata else kwargs).get("username")
+ if is_valid_uuid(name):
+ raise EngineException("Usernames that look like UUIDs are not allowed",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
return BaseTopic.edit(self, session, _id, indata=indata, kwargs=kwargs, force=force, content=content)
def new(self, rollback, session, indata=None, kwargs=None, headers=None, force=False, make_public=False):
if not session["admin"]:
raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+ # Names that look like UUIDs are not allowed
+ name = indata["username"] if indata else kwargs["username"]
+ if is_valid_uuid(name):
+ raise EngineException("Usernames that look like UUIDs are not allowed",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
return BaseTopic.new(self, rollback, session, indata=indata, kwargs=kwargs, headers=headers, force=force,
make_public=make_public)
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
BaseTopic.format_on_new(content, None)
- content["_id"] = content["name"]
+ # Removed so that the UUID is kept, to allow Project Name modification
+ # content["_id"] = content["name"]
def check_conflict_on_del(self, session, _id, force=False):
if _id == session["project_id"]:
def edit(self, session, _id, indata=None, kwargs=None, force=False, content=None):
if not session["admin"]:
raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+ # Names that look like UUIDs are not allowed
+ name = (indata if indata else kwargs).get("name")
+ if is_valid_uuid(name):
+ raise EngineException("Project names that look like UUIDs are not allowed",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
return BaseTopic.edit(self, session, _id, indata=indata, kwargs=kwargs, force=force, content=content)
def new(self, rollback, session, indata=None, kwargs=None, headers=None, force=False, make_public=False):
if not session["admin"]:
raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+ # Names that look like UUIDs are not allowed
+ name = indata["name"] if indata else kwargs["name"]
+ if is_valid_uuid(name):
+ raise EngineException("Project names that look like UUIDs are not allowed",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
return BaseTopic.new(self, rollback, session, indata=indata, kwargs=kwargs, headers=headers, force=force,
make_public=make_public)
if role_def[-1] == ".":
raise ValidationError("Operation cannot end with \".\"")
- role_def_matches = [op for op in operations if op.starswith(role_def)]
+ role_def_matches = [op for op in operations if op.startswith(role_def)]
if len(role_def_matches) == 0:
raise ValidationError("No matching operation found.")
from random import choice as random_choice
from time import time
from os import path
+from base_topic import BaseTopic # To allow project names in project_id
from authconn import AuthException
from authconn_keystone import AuthconnKeystone
token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
for _ in range(0, 32))
- if indata.get("project_id"):
- project_id = indata.get("project_id")
- if project_id not in user_content["projects"]:
- raise AuthException("project {} not allowed for this user"
- .format(project_id), http_code=HTTPStatus.UNAUTHORIZED)
+ project_id = indata.get("project_id")
+ if project_id:
+ if project_id != "admin":
+ # To allow project names in project_id
+ proj = self.db.get_one("projects", {BaseTopic.id_field("projects", project_id): project_id})
+ if proj["_id"] not in user_content["projects"] and proj["name"] not in user_content["projects"]:
+ raise AuthException("project {} not allowed for this user"
+ .format(project_id), http_code=HTTPStatus.UNAUTHORIZED)
else:
project_id = user_content["projects"][0]
if project_id == "admin":
session_admin = True
else:
- project = self.db.get_one("projects", {"_id": project_id})
+ # To allow project names in project_id
+ project = self.db.get_one("projects", {BaseTopic.id_field("projects", project_id): project_id})
session_admin = project.get("admin", False)
new_session = {"issued_at": now, "expires": now + 3600,
"_id": token_id, "id": token_id, "project_id": project_id, "username": user_content["username"],
from http import HTTPStatus
from time import time
from osm_common.dbbase import deep_update_rfc7396
-from validation import validate_input, ValidationError
+from validation import validate_input, ValidationError, is_valid_uuid
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
schema_new = None # to_override
schema_edit = None # to_override
+ # Alternative ID Fields for some Topics
+ alt_id_field = {
+ "projects": "name",
+ "users": "username"
+ }
+
def __init__(self, db, fs, msg):
self.db = db
self.fs = fs
self.msg = msg
self.logger = logging.getLogger("nbi.engine")
+ @staticmethod
+ def id_field(topic, value):
+ "Returns ID Field for given topic and field value"
+ if topic in ["projects", "users"] and not is_valid_uuid(value):
+ return BaseTopic.alt_id_field[topic]
+ else:
+ return "_id"
+
@staticmethod
def _remove_envelop(indata=None):
if not indata:
Update descriptor with the kwargs. It contains dot separated keys
:param desc: dictionary to be updated
:param kwargs: plain dictionary to be used for updating.
- :return: None, 'desc' is modified. It raises EngineException.
+ :return: None, 'desc' is modified. It raises EngineException.
"""
if not kwargs:
return
:return: dictionary, raise exception if not found.
"""
filter_db = self._get_project_filter(session, write=False, show_all=True)
- filter_db["_id"] = _id
+ # To allow project&user addressing by name AS WELL AS _id
+ filter_db[BaseTopic.id_field(self.topic, _id)] = _id
return self.db.get_one(self.topic, filter_db)
# TODO transform data for SOL005 URL requests
# TODO remove _admin if not admin
# data = self.get_item(topic, _id)
self.check_conflict_on_del(session, _id, force)
filter_q = self._get_project_filter(session, write=True, show_all=True)
- filter_q["_id"] = _id
+ # To allow project addressing by name AS WELL AS _id
+ filter_q[BaseTopic.id_field(self.topic, _id)] = _id
if not dry_run:
v = self.db.del_one(self.topic, filter_q)
self._send_msg("deleted", {"_id": _id})
deep_update_rfc7396(content, indata)
self.check_conflict_on_edit(session, content, indata, _id=_id, force=force)
self.format_on_edit(content, indata)
- self.db.replace(self.topic, _id, content)
+ # To allow project addressing by name AS WELL AS _id
+ # self.db.replace(self.topic, _id, content)
+ cid = content.get("_id")
+ self.db.replace(self.topic, cid if cid else _id, content)
indata.pop("_admin", None)
indata["_id"] = _id
with self.write_lock:
return self.map_topic[topic].edit(session, _id, indata, kwargs, force)
- def create_admin(self):
+ def create_admin_project(self):
+ """
+ Creates a new project 'admin' into database if database is empty. Useful for initialization.
+ :return: _id identity of the inserted data, or None
+ """
+
+ projects = self.db.get_one("projects", fail_on_empty=False, fail_on_more=False)
+ if projects:
+ return None
+ project_desc = {"name": "admin"}
+ fake_session = {"project_id": "admin", "username": "admin", "admin": True}
+ rollback_list = []
+ _id = self.map_topic["projects"].new(rollback_list, fake_session, project_desc, force=True)
+ return _id
+
+ def create_admin_user(self):
"""
Creates a new user admin/admin into database if database is empty. Useful for initialization
:return: _id identity of the inserted data, or None
_id = self.map_topic["users"].new(roolback_list, fake_session, user_desc, force=True)
return _id
+ def create_admin(self):
+ """
+ Creates new 'admin' user and project into database if database is empty. Useful for initialization.
+ :return: _id identity of the inserted data, or None
+ """
+ project_id = self.create_admin_project()
+ user_id = self.create_admin_user()
+ if not project_id and not user_id:
+ return None
+ else:
+ return {'project_id': project_id, 'user_id': user_id}
+
def upgrade_db(self, current_version, target_version):
if not target_version or current_version == target_version:
return
"<ID>": {"METHODS": ("GET", "POST", "DELETE", "PATCH", "PUT")}
},
"projects": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE")}
+ # Added PUT to allow Project Name modification
+ "<ID>": {"METHODS": ("GET", "DELETE", "PUT")}
},
"roles": {"METHODS": ("GET", "POST"),
"<ID>": {"METHODS": ("GET", "POST", "DELETE")}
elif main_topic == "nsilcm":
engine_topic = "nsis"
if topic == "nsi_lcm_op_occs":
- engine_topic = "nsilcmops"
+ engine_topic = "nsilcmops"
elif main_topic == "pdu":
engine_topic = "pdus"
if engine_topic == "vims": # TODO this is for backward compatibility, it will remove in the future
indata["nsiInstanceId"] = _id
self.engine.new_item(rollback, session, "nsilcmops", indata, kwargs)
outdata = {"id": _id}
-
+
elif topic == "netslice_instances" and item:
indata["lcmOperationType"] = item
indata["nsiInstanceId"] = _id
self.test_name = test_name
self.step = 0
self.last_id = ""
-
+
def set_header(self, header):
self.s.headers.update(header)
engine.test("Create project admin", "POST", "/admin/v1/projects", headers_json,
{"name": "Padmin", "admin": True}, (201, 204),
{"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
- engine.test("Create project bad format", "POST", "/admin/v1/projects", headers_json, {"name": 1}, 422,
+ engine.test("Create project bad format", "POST", "/admin/v1/projects", headers_json, {"name": 1}, (400, 422),
r_header_json, "json")
engine.test("Create user with bad project", "POST", "/admin/v1/users", headers_json,
{"username": "U1", "projects": ["P1", "P2", "Padmin"], "password": "pw1"}, 409,
# change to admin
engine.remove_authorization() # To force get authorization
engine.get_autorization()
- engine.test("Delete user U1", "DELETE", "/admin/v1/users/U1", headers_json, None, 204, None, None)
- engine.test("Delete project P1", "DELETE", "/admin/v1/projects/P1", headers_json, None, 204, None, None)
- engine.test("Delete project Padmin", "DELETE", "/admin/v1/projects/Padmin", headers_json, None, 204,
+ engine.test("Delete user U1 by Name", "DELETE", "/admin/v1/users/U1", headers_json, None, 204, None, None)
+ engine.test("Delete project P1 by Name", "DELETE", "/admin/v1/projects/P1", headers_json, None, 204, None, None)
+ engine.test("Delete project Padmin by Name", "DELETE", "/admin/v1/projects/Padmin", headers_json, None, 204,
None, None)
+ # BEGIN New Tests - Addressing Projects/Users by Name/ID
+ res = engine.test("Create new project P1", "POST", "/admin/v1/projects", headers_json, {"name": "P1"},
+ 201, {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
+ if res:
+ pid1 = res.json()["id"]
+ # print("# pid =", pid1)
+ res = engine.test("Create new project P2", "POST", "/admin/v1/projects", headers_json, {"name": "P2"},
+ 201, {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
+ if res:
+ pid2 = res.json()["id"]
+ # print("# pid =", pid2)
+ res = engine.test("Create new user U1", "POST", "/admin/v1/users", headers_json,
+ {"username": "U1", "projects": ["P1"], "password": "pw1"}, 201,
+ {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json")
+ if res:
+ uid1 = res.json()["id"]
+ # print("# uid =", uid1)
+ res = engine.test("Create new user U2", "POST", "/admin/v1/users", headers_json,
+ {"username": "U2", "projects": ["P2"], "password": "pw2"}, 201,
+ {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json")
+ if res:
+ uid2 = res.json()["id"]
+ # print("# uid =", uid2)
+ engine.test("Get Project P1 by Name", "GET", "/admin/v1/projects/P1", headers_json, None, 200, None, "json")
+ engine.test("Get Project P1 by ID", "GET", "/admin/v1/projects/"+pid1, headers_json, None, 200, None, "json")
+ engine.test("Get User U1 by Name", "GET", "/admin/v1/users/U1", headers_json, None, 200, None, "json")
+ engine.test("Get User U1 by ID", "GET", "/admin/v1/users/"+uid1, headers_json, None, 200, None, "json")
+ engine.test("Rename Project P1 by Name", "PUT", "/admin/v1/projects/P1", headers_json,
+ {"name": "P3"}, 204, None, None)
+ engine.test("Rename Project P2 by ID", "PUT", "/admin/v1/projects/"+pid2, headers_json,
+ {"name": "P4"}, 204, None, None)
+ engine.test("Rename User U1 by Name", "PUT", "/admin/v1/users/U1", headers_json,
+ {"username": "U3"}, 204, None, None)
+ engine.test("Rename User U2 by ID", "PUT", "/admin/v1/users/"+uid2, headers_json,
+ {"username": "U4"}, 204, None, None)
+ engine.test("Get Project P1 by new Name", "GET", "/admin/v1/projects/P3", headers_json, None, 200, None, "json")
+ engine.test("Get User U1 by new Name", "GET", "/admin/v1/users/U3", headers_json, None, 200, None, "json")
+ engine.test("Delete User U1 by Name", "DELETE", "/admin/v1/users/U3", headers_json, None, 204, None, None)
+ engine.test("Delete User U2 by ID", "DELETE", "/admin/v1/users/"+uid2, headers_json, None, 204, None, None)
+ engine.test("Delete Project P1 by Name", "DELETE", "/admin/v1/projects/P3", headers_json, None, 204, None,
+ None)
+ engine.test("Delete Project P2 by ID", "DELETE", "/admin/v1/projects/"+pid2, headers_json, None, 204, None,
+ None)
+ # END New Tests - Addressing Projects/Users by Name
+ engine.remove_authorization() # To finish
+
class TestFakeVim:
description = "Creates/edit/delete fake VIMs and SDN controllers"
test_class = test_classes[test]
test_class().run(test_rest, test_osm, manual_check, test_params.get(text_index))
else:
- for test, test_class in test_classes.items():
+ for test, test_class in sorted(test_classes.items()):
if fail_fast and test_rest.failed_tests:
break
test_class().run(test_rest, test_osm, manual_check, test_params.get(0))
from jsonschema import validate as js_v, exceptions as js_e
from http import HTTPStatus
from copy import deepcopy
+from uuid import UUID # To test for valid UUID
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
__version__ = "0.1"
"type": "object",
"properties": {
"password": passwd_schema,
+ "username": shortname_schema, # To allow User Name modification
"projects": {
"oneOf": [
nameshort_list_schema,
"type": "object",
"properties": {
"admin": bool_schema,
+ "name": shortname_schema, # To allow Project Name modification
},
"additionalProperties": False,
"minProperties": 1
"vim-network-id": {"OneOf": [string_schema, object_schema]},
"ip-profile": object_schema,
},
- "required": ["name"],
+ "required": ["name"],
"additionalProperties": False
}
}
nsi_terminate = {
-
+
}
raise ValidationError("Format error {} '{}' ".format(error_pos, e.message))
except js_e.SchemaError:
raise ValidationError("Bad json schema {}".format(schema_to_use), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+
+def is_valid_uuid(x):
+ """
+ Test for a valid UUID
+ :param x: string to test
+ :return: True if x is a valid uuid, False otherwise
+ """
+ try:
+ if UUID(x):
+ return True
+ except (TypeError, ValueError):
+ return False