from osm_common import dbmongo
from osm_common import dbmemory
from osm_common.dbbase import DbException
+from itertools import chain
from uuid import uuid4 # For Role _id with internal authentication backend
periodin_db_pruning = 60 * 30 # for the internal backend only. every 30 minutes expired tokens will be pruned
- def __init__(self):
+ def __init__(self, valid_methods, valid_query_string):
"""
Authenticator initializer. Setup the initial state of the object,
while it waits for the config dictionary and database initialization.
self.db = None
self.tokens_cache = dict()
self.next_db_prune_time = 0 # time when next cleaning of expired tokens must be done
- self.resources_to_operations_file = None
self.roles_to_operations_file = None
self.roles_to_operations_table = None
self.resources_to_operations_mapping = {}
self.operation_to_allowed_roles = {}
self.logger = logging.getLogger("nbi.authenticator")
- self.operations = []
+ self.role_permissions = []
+ self.valid_methods = valid_methods
+ self.valid_query_string = valid_query_string
def start(self, config):
"""
else:
raise AuthException("Unknown authentication backend: {}"
.format(config["authentication"]["backend"]))
- if not self.resources_to_operations_file:
- if "resources_to_operations" in config["rbac"]:
- self.resources_to_operations_file = config["rbac"]["resources_to_operations"]
- else:
- possible_paths = (
- __file__[:__file__.rfind("auth.py")] + "resources_to_operations.yml",
- "./resources_to_operations.yml"
- )
- for config_file in possible_paths:
- if path.isfile(config_file):
- self.resources_to_operations_file = config_file
- break
- if not self.resources_to_operations_file:
- raise AuthException("Invalid permission configuration: resources_to_operations file missing")
+
if not self.roles_to_operations_file:
if "roles_to_operations" in config["rbac"]:
self.roles_to_operations_file = config["rbac"]["roles_to_operations"]
if path.isfile(config_file):
self.roles_to_operations_file = config_file
break
- if not self.roles_to_operations_file:
- raise AuthException("Invalid permission configuration: roles_to_operations file missing")
+ if not self.roles_to_operations_file:
+ raise AuthException("Invalid permission configuration: roles_to_operations file missing")
+
if not self.roles_to_operations_table: # PROVISIONAL ?
self.roles_to_operations_table = "roles_operations" \
if config["authentication"]["backend"] == "keystone" \
else "roles"
+
+ # load role_permissions
+ def load_role_permissions(method_dict):
+ for k in method_dict:
+ if k == "ROLE_PERMISSION":
+ for method in chain(method_dict.get("METHODS", ()), method_dict.get("TODO", ())):
+ permission = method_dict["ROLE_PERMISSION"] + method.lower()
+ if permission not in self.role_permissions:
+ self.role_permissions.append(permission)
+ elif k in ("TODO", "METHODS"):
+ continue
+ else:
+ load_role_permissions(method_dict[k])
+
+ load_role_permissions(self.valid_methods)
+ for query_string in self.valid_query_string:
+ for method in ("get", "put", "patch", "post", "delete"):
+ permission = query_string.lower() + ":" + method
+ if permission not in self.role_permissions:
+ self.role_permissions.append(permission)
+
except Exception as e:
raise AuthException(str(e))
:param target_version: schema version that should be present in the database.
:return: None if OK, exception if error or version is different.
"""
- # Always reads operation to resource mapping from file (this is static, no need to store it in MongoDB)
- # Operations encoding: "<METHOD> <URL>"
- # Note: it is faster to rewrite the value than to check if it is already there or not
# PCR 28/05/2019 Commented out to allow initialization for internal backend
# if self.config["authentication"]["backend"] == "internal":
# return
- with open(self.resources_to_operations_file, "r") as stream:
- resources_to_operations_yaml = yaml.load(stream)
-
- for resource, operation in resources_to_operations_yaml["resources_to_operations"].items():
- if operation not in self.operations:
- self.operations.append(operation)
- self.resources_to_operations_mapping[resource] = operation
-
records = self.db.get_list(self.roles_to_operations_table)
# Loading permissions to MongoDB if there is not any permission.
role_with_operations["_id"] = str(uuid4())
self.db.create(self.roles_to_operations_table, role_with_operations)
+ self.logger.info("Role '{}' created at database".format(role_with_operations["name"]))
if self.config["authentication"]["backend"] != "internal":
self.backend.assign_role_to_user("admin", "admin", "system_admin")
def load_operation_to_allowed_roles(self):
"""
- Fills the internal self.operation_to_allowed_roles based on database role content and self.operations
+ Fills the internal self.operation_to_allowed_roles based on database role content and self.role_permissions
It works in a shadow copy and replace at the end to allow other threads working with the old copy
:return: None
"""
- permissions = {oper: [] for oper in self.operations}
+ permissions = {oper: [] for oper in self.role_permissions}
records = self.db.get_list(self.roles_to_operations_table)
ignore_fields = ["_id", "_admin", "name", "default"]
for record in records:
- record_permissions = {oper: record["permissions"].get("default", False) for oper in self.operations}
+ record_permissions = {oper: record["permissions"].get("default", False) for oper in self.role_permissions}
operations_joined = [(oper, value) for oper, value in record["permissions"].items()
if oper not in ignore_fields]
operations_joined.sort(key=lambda x: x[0].count(":"))
self.operation_to_allowed_roles = permissions
- def authorize(self):
+ def authorize(self, role_permission=None, query_string_operations=None):
token = None
user_passwd64 = None
try:
except Exception:
pass
outdata = self.new_token(None, {"username": user, "password": passwd})
- token = outdata["id"]
+ token = outdata["_id"]
cherrypy.session['Authorization'] = token
if not token:
token_info = self.backend.validate_token(token)
# TODO add to token info remote host, port
- self.check_permissions(token_info, cherrypy.request.path_info,
- cherrypy.request.method)
+ if role_permission:
+ self.check_permissions(token_info, cherrypy.request.method, role_permission,
+ query_string_operations)
return token_info
-
except AuthException as e:
if not isinstance(e, AuthExceptionUnauthorized):
if cherrypy.session.get('Authorization'):
cherrypy.response.headers["WWW-Authenticate"] = 'Bearer realm="{}"'.format(e)
raise
- def new_token(self, session, indata, remote):
- current_token = None
- if session:
- # current_token = session.get("token")
- current_token = session.get("_id") if self.config["authentication"]["backend"] == "keystone" \
- else session
- token_info = self.backend.authenticate(
+ def new_token(self, token_info, indata, remote):
+ new_token_info = self.backend.authenticate(
user=indata.get("username"),
password=indata.get("password"),
- token=current_token,
+ token_info=token_info,
project=indata.get("project_id")
)
- now = time()
- new_session = {
- "_id": token_info["_id"],
- "id": token_info["_id"],
- "issued_at": now,
- "expires": token_info.get("expires", now + 3600),
- "project_id": token_info["project_id"],
- "username": token_info.get("username") or session.get("username"),
- "remote_port": remote.port,
- "admin": True if token_info.get("project_name") == "admin" else False # TODO put admin in RBAC
- }
+ new_token_info["remote_port"] = remote.port
+ if not new_token_info.get("expires"):
+ new_token_info["expires"] = time() + 3600
+ if not new_token_info.get("admin"):
+ new_token_info["admin"] = True if new_token_info.get("project_name") == "admin" else False
+ # TODO put admin in RBAC
if remote.name:
- new_session["remote_host"] = remote.name
+ new_token_info["remote_host"] = remote.name
elif remote.ip:
- new_session["remote_host"] = remote.ip
+ new_token_info["remote_host"] = remote.ip
- # TODO: check if this can be avoided. Backend may provide enough information
- self.tokens_cache[token_info["_id"]] = new_session
+ self.tokens_cache[new_token_info["_id"]] = new_token_info
- return deepcopy(new_session)
+ # TODO call self._internal_tokens_prune(now) ?
+ return deepcopy(new_token_info)
- def get_token_list(self, session):
+ def get_token_list(self, token_info):
if self.config["authentication"]["backend"] == "internal":
- return self._internal_get_token_list(session)
+ return self._internal_get_token_list(token_info)
else:
# TODO: check if this can be avoided. Backend may provide enough information
return [deepcopy(token) for token in self.tokens_cache.values()
- if token["username"] == session["username"]]
+ if token["username"] == token_info["username"]]
- def get_token(self, session, token):
+ def get_token(self, token_info, token):
if self.config["authentication"]["backend"] == "internal":
- return self._internal_get_token(session, token)
+ return self._internal_get_token(token_info, token)
else:
# TODO: check if this can be avoided. Backend may provide enough information
token_value = self.tokens_cache.get(token)
if not token_value:
raise AuthException("token not found", http_code=HTTPStatus.NOT_FOUND)
- if token_value["username"] != session["username"] and not session["admin"]:
+ if token_value["username"] != token_info["username"] and not token_info["admin"]:
raise AuthException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
return token_value
except KeyError:
raise AuthException("Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND)
- def check_permissions(self, session, url, method):
- self.logger.info("Session: {}".format(session))
- self.logger.info("URL: {}".format(url))
- self.logger.info("Method: {}".format(method))
-
- key, parameters = self._normalize_url(url, method)
-
- # TODO: Check if parameters might be useful for the decision
+ def check_permissions(self, token_info, method, role_permission=None, query_string_operations=None):
+ """
+ Checks that operation has permissions to be done, base on the assigned roles to this user project
+ :param token_info: Dictionary that contains "roles" with a list of assigned roles.
+ This method fills the token_info["admin"] with True or False based on assigned tokens, if any allows admin
+ This will be used among others to hide or not the _admin content of topics
+ :param method: GET,PUT, POST, ...
+ :param role_permission: role permission name of the operation required
+ :param query_string_operations: list of possible admin query strings provided by user. It is checked that the
+ assigned role allows this query string for this method
+ :return: None if granted, exception if not allowed
+ """
- operation = self.resources_to_operations_mapping[key]
- roles_required = self.operation_to_allowed_roles[operation]
- roles_allowed = [role["name"] for role in session["roles"]]
+ roles_required = self.operation_to_allowed_roles[role_permission]
+ roles_allowed = [role["name"] for role in token_info["roles"]]
- # fills session["admin"] if some roles allows it
- session["admin"] = False
+ # fills token_info["admin"] if some roles allows it
+ token_info["admin"] = False
for role in roles_allowed:
- if role in self.operation_to_allowed_roles["admin"]:
- session["admin"] = True
+ if role in self.operation_to_allowed_roles["admin:" + method.lower()]:
+ token_info["admin"] = True
break
if "anonymous" in roles_required:
return
-
+ operation_allowed = False
for role in roles_allowed:
if role in roles_required:
- return
+ operation_allowed = True
+ # if query_string operations, check if this role allows it
+ if not query_string_operations:
+ return
+ for query_string_operation in query_string_operations:
+ if role not in self.operation_to_allowed_roles[query_string_operation]:
+ break
+ else:
+ return
- raise AuthExceptionUnauthorized("Access denied: lack of permissions.")
+ if not operation_allowed:
+ raise AuthExceptionUnauthorized("Access denied: lack of permissions.")
+ else:
+ raise AuthExceptionUnauthorized("Access denied: You have not permissions to use these admin query string")
def get_user_list(self):
return self.backend.get_user_list()
def _normalize_url(self, url, method):
+ # DEPRECATED !!!
# Removing query strings
normalized_url = url if '?' not in url else url[:url.find("?")]
normalized_url_splitted = normalized_url.split("/")
return filtered_key, parameters
- def _internal_get_token_list(self, session):
+ def _internal_get_token_list(self, token_info):
now = time()
- token_list = self.db.get_list("tokens", {"username": session["username"], "expires.gt": now})
+ token_list = self.db.get_list("tokens", {"username": token_info["username"], "expires.gt": now})
return token_list
- def _internal_get_token(self, session, token_id):
+ def _internal_get_token(self, token_info, token_id):
token_value = self.db.get_one("tokens", {"_id": token_id}, fail_on_empty=False)
if not token_value:
raise AuthException("token not found", http_code=HTTPStatus.NOT_FOUND)
- if token_value["username"] != session["username"] and not session["admin"]:
+ if token_value["username"] != token_info["username"] and not token_info["admin"]:
raise AuthException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
return token_value
self.sess = session.Session(auth=self.auth)
self.keystone = client.Client(session=self.sess)
- def authenticate(self, user, password, project=None, token=None):
+ def authenticate(self, user, password, project=None, token_info=None):
"""
- Authenticate a user using username/password or token, plus project
+ Authenticate a user using username/password or token_info, plus project
:param user: user: name, id or None
:param password: password or None
:param project: name, id, or None. If None first found project will be used to get an scope token
- :param token: previous token to obtain authorization
+ :param token_info: previous token_info to obtain authorization
:return: the scoped token info or raises an exception. The token is a dictionary with:
_id: token string id,
username: username,
password=password,
user_domain_name=self.user_domain_name,
project_domain_name=self.project_domain_name)
- elif token:
- unscoped_token = self.keystone.tokens.validate(token=token)
+ elif token_info:
+ unscoped_token = self.keystone.tokens.validate(token=token_info.get("_id"))
else:
raise AuthException("Provide credentials: username/password or Authorization Bearer token",
http_code=HTTPStatus.UNAUTHORIZED)
auth_token = {
"_id": scoped_token.auth_token,
+ "id": scoped_token.auth_token,
+ "user_id": scoped_token.user_id,
"username": scoped_token.username,
"project_id": scoped_token.project_id,
"project_name": scoped_token.project_name,
"expires": scoped_token.expires.timestamp(),
+ "issued_at": scoped_token.issued.timestamp()
}
return auth_token
except ClientException as e:
- self.logger.exception("Error during user authentication using keystone. Method: basic: {}".format(e))
+ # self.logger.exception("Error during user authentication using keystone. Method: basic: {}".format(e))
raise AuthException("Error during user authentication using Keystone: {}".format(e),
http_code=HTTPStatus.UNAUTHORIZED)
#
# return new_token["auth_token"], project_names
# except ClientException as e:
- # self.logger.exception("Error during user authentication using keystone. Method: bearer: {}".format(e))
+ # # self.logger.exception("Error during user authentication using keystone. Method: bearer: {}".format(e))
# raise AuthException("Error during user authentication using Keystone: {}".format(e),
# http_code=HTTPStatus.UNAUTHORIZED)
"""
Check if the token is valid.
- :param token: token to validate
+ :param token: token id to be validated
:return: dictionary with information associated with the token:
"expires":
"_id": token_id,
token_info = self.keystone.tokens.validate(token=token)
ses = {
"_id": token_info["auth_token"],
+ "id": token_info["auth_token"],
"project_id": token_info["project"]["id"],
"project_name": token_info["project"]["name"],
"user_id": token_info["user"]["id"],
"username": token_info["user"]["name"],
"roles": token_info["roles"],
- "expires": token_info.expires.timestamp()
+ "expires": token_info.expires.timestamp(),
+ "issued_at": token_info.issued.timestamp()
}
return ses
except ClientException as e:
- self.logger.exception("Error during token validation using keystone: {}".format(e))
+ # self.logger.exception("Error during token validation using keystone: {}".format(e))
raise AuthException("Error during token validation using Keystone: {}".format(e),
http_code=HTTPStatus.UNAUTHORIZED)
return True
except ClientException as e:
- self.logger.exception("Error during token revocation using keystone: {}".format(e))
+ # self.logger.exception("Error during token revocation using keystone: {}".format(e))
raise AuthException("Error during token revocation using Keystone: {}".format(e),
http_code=HTTPStatus.UNAUTHORIZED)
return project_names
except ClientException as e:
- self.logger.exception("Error during user project listing using keystone: {}".format(e))
+ # self.logger.exception("Error during user project listing using keystone: {}".format(e))
raise AuthException("Error during user project listing using Keystone: {}".format(e),
http_code=HTTPStatus.UNAUTHORIZED)
return roles
except ClientException as e:
- self.logger.exception("Error during user role listing using keystone: {}".format(e))
+ # self.logger.exception("Error during user role listing using keystone: {}".format(e))
raise AuthException("Error during user role listing using Keystone: {}".format(e),
http_code=HTTPStatus.UNAUTHORIZED)
# self.logger.exception("Error during user creation using keystone: {}".format(e))
raise AuthconnOperationException(e, http_code=HTTPStatus.CONFLICT)
except ClientException as e:
- self.logger.exception("Error during user creation using keystone: {}".format(e))
+ # self.logger.exception("Error during user creation using keystone: {}".format(e))
raise AuthconnOperationException("Error during user creation using Keystone: {}".format(e))
def update_user(self, user, new_name=None, new_password=None):
self.keystone.users.update(user_id, password=new_password, name=new_name)
except ClientException as e:
- self.logger.exception("Error during user password/name update using keystone: {}".format(e))
+ # self.logger.exception("Error during user password/name update using keystone: {}".format(e))
raise AuthconnOperationException("Error during user password/name update using Keystone: {}".format(e))
def delete_user(self, user_id):
return True
except ClientException as e:
- self.logger.exception("Error during user deletion using keystone: {}".format(e))
+ # self.logger.exception("Error during user deletion using keystone: {}".format(e))
raise AuthconnOperationException("Error during user deletion using Keystone: {}".format(e))
def get_user_list(self, filter_q=None):
return users
except ClientException as e:
- self.logger.exception("Error during user listing using keystone: {}".format(e))
+ # self.logger.exception("Error during user listing using keystone: {}".format(e))
raise AuthconnOperationException("Error during user listing using Keystone: {}".format(e))
def get_role_list(self, filter_q=None):
return roles
except ClientException as e:
- self.logger.exception("Error during user role listing using keystone: {}".format(e))
+ # self.logger.exception("Error during user role listing using keystone: {}".format(e))
raise AuthException("Error during user role listing using Keystone: {}".format(e),
http_code=HTTPStatus.UNAUTHORIZED)
except Conflict as ex:
raise AuthconnConflictException(str(ex))
except ClientException as e:
- self.logger.exception("Error during role creation using keystone: {}".format(e))
+ # self.logger.exception("Error during role creation using keystone: {}".format(e))
raise AuthconnOperationException("Error during role creation using Keystone: {}".format(e))
def delete_role(self, role_id):
return True
except ClientException as e:
- self.logger.exception("Error during role deletion using keystone: {}".format(e))
+ # self.logger.exception("Error during role deletion using keystone: {}".format(e))
raise AuthconnOperationException("Error during role deletion using Keystone: {}".format(e))
def update_role(self, role, new_name):
return projects
except ClientException as e:
- self.logger.exception("Error during user project listing using keystone: {}".format(e))
+ # self.logger.exception("Error during user project listing using keystone: {}".format(e))
raise AuthException("Error during user project listing using Keystone: {}".format(e),
http_code=HTTPStatus.UNAUTHORIZED)
result = self.keystone.projects.create(project, self.project_domain_name)
return result.id
except ClientException as e:
- self.logger.exception("Error during project creation using keystone: {}".format(e))
+ # self.logger.exception("Error during project creation using keystone: {}".format(e))
raise AuthconnOperationException("Error during project creation using Keystone: {}".format(e))
def delete_project(self, project_id):
return True
except ClientException as e:
- self.logger.exception("Error during project deletion using keystone: {}".format(e))
+ # self.logger.exception("Error during project deletion using keystone: {}".format(e))
raise AuthconnOperationException("Error during project deletion using Keystone: {}".format(e))
def update_project(self, project_id, new_name):
try:
self.keystone.projects.update(project_id, name=new_name)
except ClientException as e:
- self.logger.exception("Error during project update using keystone: {}".format(e))
+ # self.logger.exception("Error during project update using keystone: {}".format(e))
raise AuthconnOperationException("Error during project deletion using Keystone: {}".format(e))
def assign_role_to_user(self, user, project, role):
self.keystone.roles.grant(role_obj, user=user_obj, project=project_obj)
except ClientException as e:
- self.logger.exception("Error during user role assignment using keystone: {}".format(e))
+ # self.logger.exception("Error during user role assignment using keystone: {}".format(e))
raise AuthconnOperationException("Error during role '{}' assignment to user '{}' and project '{}' using "
"Keystone: {}".format(role, user, project, e))
self.keystone.roles.revoke(role_obj, user=user_obj, project=project_obj)
except ClientException as e:
- self.logger.exception("Error during user role revocation using keystone: {}".format(e))
+ # self.logger.exception("Error during user role revocation using keystone: {}".format(e))
raise AuthconnOperationException("Error during role '{}' revocation to user '{}' and project '{}' using "
"Keystone: {}".format(role, user, project, e))
Retry-After IETF RFC 7231 [19] Fri, 31 Dec 1999 23:59:59 GMT
"""
+valid_query_string = ("ADMIN", "SET_PROJECT", "FORCE", "PUBLIC")
+# ^ Contains possible administrative query string words:
+# ADMIN=True(by default)|Project|Project-list: See all elements, or elements of a project
+# (not owned by my session project).
+# PUBLIC=True(by default)|False: See/hide public elements. Set/Unset a topic to be public
+# FORCE=True(by default)|False: Force edition/deletion operations
+# SET_PROJECT=Project|Project-list: Add/Delete the topic to the projects portfolio
+
+valid_url_methods = {
+ # contains allowed URL and methods, and the role_permission name
+ "admin": {
+ "v1": {
+ "tokens": {"METHODS": ("GET", "POST", "DELETE"),
+ "ROLE_PERMISSION": "tokens:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "tokens:id:"
+ }
+ },
+ "users": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "users:",
+ "<ID>": {"METHODS": ("GET", "POST", "DELETE", "PATCH", "PUT"),
+ "ROLE_PERMISSION": "users:id:"
+ }
+ },
+ "projects": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "projects:",
+ "<ID>": {"METHODS": ("GET", "DELETE", "PUT"),
+ "ROLE_PERMISSION": "projects:id:"}
+ },
+ "roles": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "roles:",
+ "<ID>": {"METHODS": ("GET", "POST", "DELETE", "PUT"),
+ "ROLE_PERMISSION": "roles:id:"
+ }
+ },
+ "vims": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vims:",
+ "<ID>": {"METHODS": ("GET", "DELETE", "PATCH", "PUT"),
+ "ROLE_PERMISSION": "vims:id:"
+ }
+ },
+ "vim_accounts": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vim_accounts:",
+ "<ID>": {"METHODS": ("GET", "DELETE", "PATCH", "PUT"),
+ "ROLE_PERMISSION": "vim_accounts:id:"
+ }
+ },
+ "wim_accounts": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "wim_accounts:",
+ "<ID>": {"METHODS": ("GET", "DELETE", "PATCH", "PUT"),
+ "ROLE_PERMISSION": "wim_accounts:id:"
+ }
+ },
+ "sdns": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "sdn_controllers:",
+ "<ID>": {"METHODS": ("GET", "DELETE", "PATCH", "PUT"),
+ "ROLE_PERMISSION": "sdn_controllers:id:"
+ }
+ },
+ }
+ },
+ "pdu": {
+ "v1": {
+ "pdu_descriptors": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "pduds:",
+ "<ID>": {"METHODS": ("GET", "POST", "DELETE", "PATCH", "PUT"),
+ "ROLE_PERMISSION": "pduds:id:"
+ }
+ },
+ }
+ },
+ "nsd": {
+ "v1": {
+ "ns_descriptors_content": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "nsds:",
+ "<ID>": {"METHODS": ("GET", "PUT", "DELETE"),
+ "ROLE_PERMISSION": "nsds:id:"
+ }
+ },
+ "ns_descriptors": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "nsds:",
+ "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "nsds:id:",
+ "nsd_content": {"METHODS": ("GET", "PUT"),
+ "ROLE_PERMISSION": "nsds:id:content:",
+ },
+ "nsd": {"METHODS": ("GET",), # descriptor inside package
+ "ROLE_PERMISSION": "nsds:id:content:"
+ },
+ "artifacts": {"*": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "nsds:id:nsd_artifact:"
+ }
+ }
+ }
+ },
+ "pnf_descriptors": {"TODO": ("GET", "POST"),
+ "<ID>": {"TODO": ("GET", "DELETE", "PATCH"),
+ "pnfd_content": {"TODO": ("GET", "PUT")}
+ }
+ },
+ "subscriptions": {"TODO": ("GET", "POST"),
+ "<ID>": {"TODO": ("GET", "DELETE")}
+ },
+ }
+ },
+ "vnfpkgm": {
+ "v1": {
+ "vnf_packages_content": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnfds:",
+ "<ID>": {"METHODS": ("GET", "PUT", "DELETE"),
+ "ROLE_PERMISSION": "vnfds:id:"}
+ },
+ "vnf_packages": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnfds:",
+ "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"), # GET: vnfPkgInfo
+ "ROLE_PERMISSION": "vnfds:id:",
+ "package_content": {"METHODS": ("GET", "PUT"), # package
+ "ROLE_PERMISSION": "vnfds:id:",
+ "upload_from_uri": {"METHODS": (),
+ "TODO": ("POST", ),
+ "ROLE_PERMISSION": "vnfds:id:upload:"
+ }
+ },
+ "vnfd": {"METHODS": ("GET", ), # descriptor inside package
+ "ROLE_PERMISSION": "vnfds:id:content:"
+ },
+ "artifacts": {"*": {"METHODS": ("GET", ),
+ "ROLE_PERMISSION": "vnfds:id:vnfd_artifact:"
+ }
+ }
+ }
+ },
+ "subscriptions": {"TODO": ("GET", "POST"),
+ "<ID>": {"TODO": ("GET", "DELETE")}
+ },
+ }
+ },
+ "nslcm": {
+ "v1": {
+ "ns_instances_content": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "ns_instances:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "ns_instances:id:"
+ }
+ },
+ "ns_instances": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "ns_instances:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "ns_instances:id:",
+ "scale": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:scale:"
+ },
+ "terminate": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:terminate:"
+ },
+ "instantiate": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:instantiate:"
+ },
+ "action": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:action:"
+ },
+ }
+ },
+ "ns_lcm_op_occs": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "ns_instances:opps:",
+ "<ID>": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "ns_instances:opps:id:"
+ },
+ },
+ "vnfrs": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:",
+ "<ID>": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:id:"
+ }
+ },
+ "vnf_instances": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:",
+ "<ID>": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:id:"
+ }
+ },
+ }
+ },
+ "nst": {
+ "v1": {
+ "netslice_templates_content": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "slice_templates:",
+ "<ID>": {"METHODS": ("GET", "PUT", "DELETE"),
+ "ROLE_PERMISSION": "slice_templates:id:", }
+ },
+ "netslice_templates": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "slice_templates:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "TODO": ("PATCH",),
+ "ROLE_PERMISSION": "slice_templates:id:",
+ "nst_content": {"METHODS": ("GET", "PUT"),
+ "ROLE_PERMISSION": "slice_templates:id:content:"
+ },
+ "nst": {"METHODS": ("GET",), # descriptor inside package
+ "ROLE_PERMISSION": "slice_templates:id:content:"
+ },
+ "artifacts": {"*": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "slice_templates:id:content:"
+ }
+ }
+ }
+ },
+ "subscriptions": {"TODO": ("GET", "POST"),
+ "<ID>": {"TODO": ("GET", "DELETE")}
+ },
+ }
+ },
+ "nsilcm": {
+ "v1": {
+ "netslice_instances_content": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "slice_instances:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "slice_instances:id:"
+ }
+ },
+ "netslice_instances": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "slice_instances:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "slice_instances:id:",
+ "terminate": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "slice_instances:id:terminate:"
+ },
+ "instantiate": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "slice_instances:id:instantiate:"
+ },
+ "action": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "slice_instances:id:action:"
+ },
+ }
+ },
+ "nsi_lcm_op_occs": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "slice_instances:opps:",
+ "<ID>": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "slice_instances:opps:id:",
+ },
+ },
+ }
+ },
+ "nspm": {
+ "v1": {
+ "pm_jobs": {
+ "<ID>": {
+ "reports": {
+ "<ID>": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "reports:id:",
+ }
+ }
+ },
+ },
+ },
+ },
+}
+
class NbiException(Exception):
def __init__(self):
self.instance += 1
self.engine = Engine()
- self.authenticator = Authenticator()
- self.valid_methods = { # contains allowed URL and methods
- "admin": {
- "v1": {
- "tokens": {"METHODS": ("GET", "POST", "DELETE"),
- "<ID>": {"METHODS": ("GET", "DELETE")}
- },
- "users": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "POST", "DELETE", "PATCH", "PUT")}
- },
- "projects": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE", "PUT")}
- },
- "roles": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "POST", "DELETE", "PUT")}
- },
- "vims": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH", "PUT")}
- },
- "vim_accounts": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH", "PUT")}
- },
- "wim_accounts": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH", "PUT")}
- },
- "sdns": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH", "PUT")}
- },
- }
- },
- "pdu": {
- "v1": {
- "pdu_descriptors": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "POST", "DELETE", "PATCH", "PUT")}
- },
- }
- },
- "nsd": {
- "v1": {
- "ns_descriptors_content": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "PUT", "DELETE")}
- },
- "ns_descriptors": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
- "nsd_content": {"METHODS": ("GET", "PUT")},
- "nsd": {"METHODS": "GET"}, # descriptor inside package
- "artifacts": {"*": {"METHODS": "GET"}}
- }
- },
- "pnf_descriptors": {"TODO": ("GET", "POST"),
- "<ID>": {"TODO": ("GET", "DELETE", "PATCH"),
- "pnfd_content": {"TODO": ("GET", "PUT")}
- }
- },
- "subscriptions": {"TODO": ("GET", "POST"),
- "<ID>": {"TODO": ("GET", "DELETE")}
- },
- }
- },
- "vnfpkgm": {
- "v1": {
- "vnf_packages_content": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "PUT", "DELETE")}
- },
- "vnf_packages": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"), # GET: vnfPkgInfo
- "package_content": {"METHODS": ("GET", "PUT"), # package
- "upload_from_uri": {"TODO": "POST"}
- },
- "vnfd": {"METHODS": "GET"}, # descriptor inside package
- "artifacts": {"*": {"METHODS": "GET"}}
- }
- },
- "subscriptions": {"TODO": ("GET", "POST"),
- "<ID>": {"TODO": ("GET", "DELETE")}
- },
- }
- },
- "nslcm": {
- "v1": {
- "ns_instances_content": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE")}
- },
- "ns_instances": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "scale": {"METHODS": "POST"},
- "terminate": {"METHODS": "POST"},
- "instantiate": {"METHODS": "POST"},
- "action": {"METHODS": "POST"},
- }
- },
- "ns_lcm_op_occs": {"METHODS": "GET",
- "<ID>": {"METHODS": "GET"},
- },
- "vnfrs": {"METHODS": ("GET"),
- "<ID>": {"METHODS": ("GET")}
- },
- "vnf_instances": {"METHODS": ("GET"),
- "<ID>": {"METHODS": ("GET")}
- },
- }
- },
- "nst": {
- "v1": {
- "netslice_templates_content": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "PUT", "DELETE")}
- },
- "netslice_templates": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE"), "TODO": "PATCH",
- "nst_content": {"METHODS": ("GET", "PUT")},
- "nst": {"METHODS": "GET"}, # descriptor inside package
- "artifacts": {"*": {"METHODS": "GET"}}
- }
- },
- "subscriptions": {"TODO": ("GET", "POST"),
- "<ID>": {"TODO": ("GET", "DELETE")}
- },
- }
- },
- "nsilcm": {
- "v1": {
- "netslice_instances_content": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE")}
- },
- "netslice_instances": {"METHODS": ("GET", "POST"),
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "terminate": {"METHODS": "POST"},
- "instantiate": {"METHODS": "POST"},
- "action": {"METHODS": "POST"},
- }
- },
- "nsi_lcm_op_occs": {"METHODS": "GET",
- "<ID>": {"METHODS": "GET"},
- },
- }
- },
- "nspm": {
- "v1": {
- "pm_jobs": {
- "<ID>": {
- "reports": {
- "<ID>": {"METHODS": ("GET")}
- }
- },
- },
- },
- },
- }
+ self.authenticator = Authenticator(valid_url_methods, valid_query_string)
def _format_in(self, kwargs):
try:
raise NbiException(error_text + str(exc), HTTPStatus.BAD_REQUEST)
@staticmethod
- def _format_out(data, session=None, _format=None):
+ def _format_out(data, token_info=None, _format=None):
"""
return string of dictionary data according to requested json, yaml, xml. By default json
:param data: response to be sent. Can be a dict, text or file
- :param session:
+ :param token_info: Contains among other username and project
:param _format: The format to be set as Content-Type ir data is a file
:return: None
"""
accept = cherrypy.request.headers.get("Accept")
if data is None:
if accept and "text/html" in accept:
- return html.format(data, cherrypy.request, cherrypy.response, session)
+ return html.format(data, cherrypy.request, cherrypy.response, token_info)
# cherrypy.response.status = HTTPStatus.NO_CONTENT.value
return
elif hasattr(data, "read"): # file object
a = json.dumps(data, indent=4) + "\n"
return a.encode("utf8")
elif "text/html" in accept:
- return html.format(data, cherrypy.request, cherrypy.response, session)
+ return html.format(data, cherrypy.request, cherrypy.response, token_info)
elif "application/yaml" in accept or "*/*" in accept or "text/plain" in accept:
pass
@cherrypy.expose
def index(self, *args, **kwargs):
- session = None
+ token_info = None
try:
if cherrypy.request.method == "GET":
- session = self.authenticator.authorize()
+ token_info = self.authenticator.authorize()
outdata = "Index page"
else:
raise cherrypy.HTTPError(HTTPStatus.METHOD_NOT_ALLOWED.value,
"Method {} not allowed for tokens".format(cherrypy.request.method))
- return self._format_out(outdata, session)
+ return self._format_out(outdata, token_info)
except (EngineException, AuthException) as e:
- cherrypy.log("index Exception {}".format(e))
+ # cherrypy.log("index Exception {}".format(e))
cherrypy.response.status = e.http_code.value
- return self._format_out("Welcome to OSM!", session)
+ return self._format_out("Welcome to OSM!", token_info)
@cherrypy.expose
def version(self, *args, **kwargs):
@cherrypy.expose
def token(self, method, token_id=None, kwargs=None):
- session = None
+ token_info = None
# self.engine.load_dbase(cherrypy.request.app.config)
indata = self._format_in(kwargs)
if not isinstance(indata, dict):
raise NbiException("Expected application/yaml or application/json Content-Type", HTTPStatus.BAD_REQUEST)
try:
if method == "GET":
- session = self.authenticator.authorize()
+ token_info = self.authenticator.authorize()
if token_id:
- outdata = self.authenticator.get_token(session, token_id)
+ outdata = self.authenticator.get_token(token_info, token_id)
else:
- outdata = self.authenticator.get_token_list(session)
+ outdata = self.authenticator.get_token_list(token_info)
elif method == "POST":
try:
- session = self.authenticator.authorize()
+ token_info = self.authenticator.authorize()
except Exception:
- session = None
+ token_info = None
if kwargs:
indata.update(kwargs)
- outdata = self.authenticator.new_token(session, indata, cherrypy.request.remote)
- session = outdata
+ outdata = self.authenticator.new_token(token_info, indata, cherrypy.request.remote)
+ token_info = outdata
cherrypy.session['Authorization'] = outdata["_id"]
self._set_location_header("admin", "v1", "tokens", outdata["_id"])
# cherrypy.response.cookie["Authorization"] = outdata["id"]
if not token_id and "id" in kwargs:
token_id = kwargs["id"]
elif not token_id:
- session = self.authenticator.authorize()
- token_id = session["_id"]
+ token_info = self.authenticator.authorize()
+ token_id = token_info["_id"]
outdata = self.authenticator.del_token(token_id)
- session = None
+ token_info = None
cherrypy.session['Authorization'] = "logout"
# cherrypy.response.cookie["Authorization"] = token_id
# cherrypy.response.cookie["Authorization"]['expires'] = 0
else:
raise NbiException("Method {} not allowed for token".format(method), HTTPStatus.METHOD_NOT_ALLOWED)
- return self._format_out(outdata, session)
+ return self._format_out(outdata, token_info)
except (NbiException, EngineException, DbException, AuthException) as e:
cherrypy.log("tokens Exception {}".format(e))
cherrypy.response.status = e.http_code.value
"status": e.http_code.value,
"detail": str(e),
}
- return self._format_out(problem_details, session)
+ return self._format_out(problem_details, token_info)
@cherrypy.expose
def test(self, *args, **kwargs):
return_text += "</pre></html>"
return return_text
- def _check_valid_url_method(self, method, *args):
+ @staticmethod
+ def _check_valid_url_method(method, *args):
if len(args) < 3:
raise NbiException("URL must contain at least 'main_topic/version/topic'", HTTPStatus.METHOD_NOT_ALLOWED)
- reference = self.valid_methods
+ reference = valid_url_methods
for arg in args:
if arg is None:
break
raise NbiException("Method {} not supported yet for this URL".format(method), HTTPStatus.NOT_IMPLEMENTED)
elif "METHODS" in reference and method not in reference["METHODS"]:
raise NbiException("Method {} not supported for this URL".format(method), HTTPStatus.METHOD_NOT_ALLOWED)
- return
+ return reference["ROLE_PERMISSION"] + method.lower()
@staticmethod
def _set_location_header(main_topic, version, topic, id):
return
@staticmethod
- def _manage_admin_query(session, kwargs, method, _id):
+ def _extract_query_string_operations(kwargs, method):
+ """
+
+ :param kwargs:
+ :return:
+ """
+ query_string_operations = []
+ if kwargs:
+ for qs in ("FORCE", "PUBLIC", "ADMIN", "SET_PROJECT"):
+ if qs in kwargs and kwargs[qs].lower() != "false":
+ query_string_operations.append(qs.lower() + ":" + method.lower())
+ return query_string_operations
+
+ @staticmethod
+ def _manage_admin_query(token_info, kwargs, method, _id):
"""
Processes the administrator query inputs (if any) of FORCE, ADMIN, PUBLIC, SET_PROJECT
Check that users has rights to use them and returs the admin_query
- :param session: session rights obtained by token
+ :param token_info: token_info rights obtained by token
:param kwargs: query string input.
:param method: http method: GET, POSST, PUT, ...
:param _id:
set_project: tuple with projects that a created element will belong to
method: show, list, delete, write
"""
- admin_query = {"force": False, "project_id": (session["project_id"], ), "username": session["username"],
- "admin": session["admin"], "public": None}
+ admin_query = {"force": False, "project_id": (token_info["project_id"], ), "username": token_info["username"],
+ "admin": token_info["admin"], "public": None}
if kwargs:
# FORCE
if "FORCE" in kwargs:
if "ADMIN" in kwargs:
behave_as = kwargs.pop("ADMIN")
if behave_as.lower() != "false":
- if not session["admin"]:
+ if not token_info["admin"]:
raise NbiException("Only admin projects can use 'ADMIN' query string", HTTPStatus.UNAUTHORIZED)
if not behave_as or behave_as.lower() == "true": # convert True, None to empty list
admin_query["project_id"] = ()
# PROJECT_READ
# if "PROJECT_READ" in kwargs:
# admin_query["project"] = kwargs.pop("project")
- # if admin_query["project"] == session["project_id"]:
+ # if admin_query["project"] == token_info["project_id"]:
if method == "GET":
if _id:
admin_query["method"] = "show"
@cherrypy.expose
def default(self, main_topic=None, version=None, topic=None, _id=None, item=None, *args, **kwargs):
- session = None
+ token_info = None
outdata = None
_format = None
method = "DONE"
engine_topic = None
rollback = []
- session = None
+ engine_session = None
try:
if not main_topic or not version or not topic:
raise NbiException("URL must contain at least 'main_topic/version/topic'",
else:
method = cherrypy.request.method
- self._check_valid_url_method(method, main_topic, version, topic, _id, item, *args)
-
+ role_permission = self._check_valid_url_method(method, main_topic, version, topic, _id, item, *args)
+ query_string_operations = self._extract_query_string_operations(kwargs, method)
if main_topic == "admin" and topic == "tokens":
return self.token(method, _id, kwargs)
# self.engine.load_dbase(cherrypy.request.app.config)
- session = self.authenticator.authorize()
- session = self._manage_admin_query(session, kwargs, method, _id)
+
+ token_info = self.authenticator.authorize(role_permission, query_string_operations)
+ engine_session = self._manage_admin_query(token_info, kwargs, method, _id)
indata = self._format_in(kwargs)
engine_topic = topic
if topic == "subscriptions":
path = ()
else:
path = None
- file, _format = self.engine.get_file(session, engine_topic, _id, path,
+ file, _format = self.engine.get_file(engine_session, engine_topic, _id, path,
cherrypy.request.headers.get("Accept"))
outdata = file
elif not _id:
- outdata = self.engine.get_item_list(session, engine_topic, kwargs)
+ outdata = self.engine.get_item_list(engine_session, engine_topic, kwargs)
else:
if item == "reports":
# TODO check that project_id (_id in this context) has permissions
_id = args[0]
- outdata = self.engine.get_item(session, engine_topic, _id)
+ outdata = self.engine.get_item(engine_session, engine_topic, _id)
elif method == "POST":
if topic in ("ns_descriptors_content", "vnf_packages_content", "netslice_templates_content"):
_id = cherrypy.request.headers.get("Transaction-Id")
if not _id:
- _id = self.engine.new_item(rollback, session, engine_topic, {}, None, cherrypy.request.headers)
- completed = self.engine.upload_content(session, engine_topic, _id, indata, kwargs,
+ _id = self.engine.new_item(rollback, engine_session, engine_topic, {}, None,
+ cherrypy.request.headers)
+ completed = self.engine.upload_content(engine_session, engine_topic, _id, indata, kwargs,
cherrypy.request.headers)
if completed:
self._set_location_header(main_topic, version, topic, _id)
outdata = {"id": _id}
elif topic == "ns_instances_content":
# creates NSR
- _id = self.engine.new_item(rollback, session, engine_topic, indata, kwargs)
+ _id = self.engine.new_item(rollback, engine_session, engine_topic, indata, kwargs)
# creates nslcmop
indata["lcmOperationType"] = "instantiate"
indata["nsInstanceId"] = _id
- nslcmop_id = self.engine.new_item(rollback, session, "nslcmops", indata, None)
+ nslcmop_id = self.engine.new_item(rollback, engine_session, "nslcmops", indata, None)
self._set_location_header(main_topic, version, topic, _id)
outdata = {"id": _id, "nslcmop_id": nslcmop_id}
elif topic == "ns_instances" and item:
indata["lcmOperationType"] = item
indata["nsInstanceId"] = _id
- _id = self.engine.new_item(rollback, session, "nslcmops", indata, kwargs)
+ _id = self.engine.new_item(rollback, engine_session, "nslcmops", indata, kwargs)
self._set_location_header(main_topic, version, "ns_lcm_op_occs", _id)
outdata = {"id": _id}
cherrypy.response.status = HTTPStatus.ACCEPTED.value
elif topic == "netslice_instances_content":
# creates NetSlice_Instance_record (NSIR)
- _id = self.engine.new_item(rollback, session, engine_topic, indata, kwargs)
+ _id = self.engine.new_item(rollback, engine_session, engine_topic, indata, kwargs)
self._set_location_header(main_topic, version, topic, _id)
indata["lcmOperationType"] = "instantiate"
indata["netsliceInstanceId"] = _id
- nsilcmop_id = self.engine.new_item(rollback, session, "nsilcmops", indata, kwargs)
+ nsilcmop_id = self.engine.new_item(rollback, engine_session, "nsilcmops", indata, kwargs)
outdata = {"id": _id, "nsilcmop_id": nsilcmop_id}
elif topic == "netslice_instances" and item:
indata["lcmOperationType"] = item
indata["netsliceInstanceId"] = _id
- _id = self.engine.new_item(rollback, session, "nsilcmops", indata, kwargs)
+ _id = self.engine.new_item(rollback, engine_session, "nsilcmops", indata, kwargs)
self._set_location_header(main_topic, version, "nsi_lcm_op_occs", _id)
outdata = {"id": _id}
cherrypy.response.status = HTTPStatus.ACCEPTED.value
else:
- _id = self.engine.new_item(rollback, session, engine_topic, indata, kwargs,
+ _id = self.engine.new_item(rollback, engine_session, engine_topic, indata, kwargs,
cherrypy.request.headers)
self._set_location_header(main_topic, version, topic, _id)
outdata = {"id": _id}
elif method == "DELETE":
if not _id:
- outdata = self.engine.del_item_list(session, engine_topic, kwargs)
+ outdata = self.engine.del_item_list(engine_session, engine_topic, kwargs)
cherrypy.response.status = HTTPStatus.OK.value
else: # len(args) > 1
delete_in_process = False
- if topic == "ns_instances_content" and not session["force"]:
+ if topic == "ns_instances_content" and not engine_session["force"]:
nslcmop_desc = {
"lcmOperationType": "terminate",
"nsInstanceId": _id,
"autoremove": True
}
- opp_id = self.engine.new_item(rollback, session, "nslcmops", nslcmop_desc, None)
+ opp_id = self.engine.new_item(rollback, engine_session, "nslcmops", nslcmop_desc, None)
if opp_id:
delete_in_process = True
outdata = {"_id": opp_id}
cherrypy.response.status = HTTPStatus.ACCEPTED.value
- elif topic == "netslice_instances_content" and not session["force"]:
+ elif topic == "netslice_instances_content" and not engine_session["force"]:
nsilcmop_desc = {
"lcmOperationType": "terminate",
"netsliceInstanceId": _id,
"autoremove": True
}
- opp_id = self.engine.new_item(rollback, session, "nsilcmops", nsilcmop_desc, None)
+ opp_id = self.engine.new_item(rollback, engine_session, "nsilcmops", nsilcmop_desc, None)
if opp_id:
delete_in_process = True
outdata = {"_id": opp_id}
cherrypy.response.status = HTTPStatus.ACCEPTED.value
if not delete_in_process:
- self.engine.del_item(session, engine_topic, _id)
+ self.engine.del_item(engine_session, engine_topic, _id)
cherrypy.response.status = HTTPStatus.NO_CONTENT.value
if engine_topic in ("vim_accounts", "wim_accounts", "sdns"):
cherrypy.response.status = HTTPStatus.ACCEPTED.value
elif method in ("PUT", "PATCH"):
outdata = None
- if not indata and not kwargs and not session.get("set_project"):
+ if not indata and not kwargs and not engine_session.get("set_project"):
raise NbiException("Nothing to update. Provide payload and/or query string",
HTTPStatus.BAD_REQUEST)
if item in ("nsd_content", "package_content", "nst_content") and method == "PUT":
- completed = self.engine.upload_content(session, engine_topic, _id, indata, kwargs,
+ completed = self.engine.upload_content(engine_session, engine_topic, _id, indata, kwargs,
cherrypy.request.headers)
if not completed:
cherrypy.response.headers["Transaction-Id"] = id
else:
- self.engine.edit_item(session, engine_topic, _id, indata, kwargs)
+ self.engine.edit_item(engine_session, engine_topic, _id, indata, kwargs)
cherrypy.response.status = HTTPStatus.NO_CONTENT.value
else:
raise NbiException("Method {} not allowed".format(method), HTTPStatus.METHOD_NOT_ALLOWED)
# if Role information changes, it is needed to reload the information of roles
if topic == "roles" and method != "GET":
self.authenticator.load_operation_to_allowed_roles()
- return self._format_out(outdata, session, _format)
+ return self._format_out(outdata, token_info, _format)
except Exception as e:
if isinstance(e, (NbiException, EngineException, DbException, FsException, MsgException, AuthException,
ValidationError)):
"status": http_code_value,
"detail": error_text,
}
- return self._format_out(problem_details, session)
+ return self._format_out(problem_details, token_info)
# raise cherrypy.HTTPError(e.http_code.value, str(e))