from uuid import uuid4
from http import HTTPStatus
from time import time
-from osm_common.dbbase import deep_update_rfc7396
+from osm_common.dbbase import deep_update_rfc7396, DbException
from osm_nbi.validation import validate_input, ValidationError, is_valid_uuid
from yaml import safe_load, YAMLError
class EngineException(Exception):
-
def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
self.http_code = http_code
super(Exception, self).__init__(message)
+class NBIBadArgumentsException(Exception):
+ """
+ Bad argument values exception
+ """
+
+ def __init__(self, message: str = "", bad_args: list = None):
+ Exception.__init__(self, message)
+ self.message = message
+ self.bad_args = bad_args
+
+ def __str__(self):
+ return "{}, Bad arguments: {}".format(self.message, self.bad_args)
+
+
def deep_get(target_dict, key_list):
"""
Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
return target_dict
+def detect_descriptor_usage(descriptor: dict, db_collection: str, db: object) -> bool:
+ """Detect the descriptor usage state.
+
+ Args:
+ descriptor (dict): VNF or NS Descriptor as dictionary
+ db_collection (str): collection name which is looked for in DB
+ db (object): name of db object
+
+ Returns:
+ True if descriptor is in use else None
+
+ """
+ try:
+ if not descriptor:
+ raise NBIBadArgumentsException(
+ "Argument is mandatory and can not be empty", "descriptor"
+ )
+
+ if not db:
+ raise NBIBadArgumentsException("A valid DB object should be provided", "db")
+
+ search_dict = {
+ "vnfds": ("vnfrs", "vnfd-id"),
+ "nsds": ("nsrs", "nsd-id"),
+ }
+
+ if db_collection not in search_dict:
+ raise NBIBadArgumentsException(
+ "db_collection should be equal to vnfds or nsds", "db_collection"
+ )
+
+ record_list = db.get_list(
+ search_dict[db_collection][0],
+ {search_dict[db_collection][1]: descriptor["_id"]},
+ )
+
+ if record_list:
+ return True
+
+ except (DbException, KeyError, NBIBadArgumentsException) as error:
+ raise EngineException(
+ f"Error occured while detecting the descriptor usage: {error}"
+ )
+
+
+def update_descriptor_usage_state(
+ descriptor: dict, db_collection: str, db: object
+) -> None:
+ """Updates the descriptor usage state.
+
+ Args:
+ descriptor (dict): VNF or NS Descriptor as dictionary
+ db_collection (str): collection name which is looked for in DB
+ db (object): name of db object
+
+ Returns:
+ None
+
+ """
+ try:
+ descriptor_update = {
+ "_admin.usageState": "NOT_IN_USE",
+ }
+
+ if detect_descriptor_usage(descriptor, db_collection, db):
+ descriptor_update = {
+ "_admin.usageState": "IN_USE",
+ }
+
+ db.set_one(
+ db_collection, {"_id": descriptor["_id"]}, update_dict=descriptor_update
+ )
+
+ except (DbException, KeyError, NBIBadArgumentsException) as error:
+ raise EngineException(
+ f"Error occured while updating the descriptor usage state: {error}"
+ )
+
+
def get_iterable(input_var):
"""
Returns an iterable, in case input_var is None it just returns an empty tuple
return tuple(filled)
+def increment_ip_mac(ip_mac, vm_index=1):
+ if not isinstance(ip_mac, str):
+ return ip_mac
+ try:
+ # try with ipv4 look for last dot
+ i = ip_mac.rfind(".")
+ if i > 0:
+ i += 1
+ return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
+ # try with ipv6 or mac look for last colon. Operate in hex
+ i = ip_mac.rfind(":")
+ if i > 0:
+ i += 1
+ # format in hex, len can be 2 for mac or 4 for ipv6
+ return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
+ ip_mac[:i], int(ip_mac[i:], 16) + vm_index
+ )
+ except Exception:
+ pass
+ return None
+
+
class BaseTopic:
# static variables for all instance classes
- topic = None # to_override
- topic_msg = None # to_override
- quota_name = None # to_override. If not provided topic will be used for quota_name
- schema_new = None # to_override
+ topic = None # to_override
+ topic_msg = None # to_override
+ quota_name = None # to_override. If not provided topic will be used for quota_name
+ schema_new = None # to_override
schema_edit = None # to_override
multiproject = True # True if this Topic can be shared by several projects. Then it contains _admin.projects_read
default_quota = 500
# Alternative ID Fields for some Topics
- alt_id_field = {
- "projects": "name",
- "users": "username",
- "roles": "name"
- }
+ alt_id_field = {"projects": "name", "users": "username", "roles": "name"}
def __init__(self, db, fs, msg, auth):
self.db = db
count = self.db.count(self.topic, {"_admin.projects_read": pid})
if count >= quota:
name = proj["name"]
- raise ValidationError("quota ({}={}) exceeded for project {} ({})".format(quota_name, quota, name, pid),
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise ValidationError(
+ "quota ({}={}) exceeded for project {} ({})".format(
+ quota_name, quota, name, pid
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
def _validate_input_new(self, input, force=False):
"""
validate_input(input, self.schema_new)
return input
- def _validate_input_edit(self, input, force=False):
+ def _validate_input_edit(self, input, content, force=False):
"""
Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind
:param input: user input content for the new topic
not present or contains ANY mean public.
:param session: contains:
project_id: project list this session has rights to access. Can be empty, one or several
- set_project: items created will contain this project list
+ set_project: items created will contain this project list
force: True or False
public: True, False or None
method: "list", "show", "write", "delete"
project_filter_n.append(session["PROJECT.ne"])
if project_filter:
- if session["method"] in ("list", "show", "delete") or session.get("set_project"):
+ if session["method"] in ("list", "show", "delete") or session.get(
+ "set_project"
+ ):
p_filter["_admin.projects_read.cont"] = project_filter
else:
p_filter["_admin.projects_write.cont"] = project_filter
if project_filter_n:
- if session["method"] in ("list", "show", "delete") or session.get("set_project"):
+ if session["method"] in ("list", "show", "delete") or session.get(
+ "set_project"
+ ):
p_filter["_admin.projects_read.ncont"] = project_filter_n
else:
p_filter["_admin.projects_write.ncont"] = project_filter_n
:param final_content: data once modified. This method may change it.
:param edit_content: incremental data that contains the modifications to apply
:param _id: internal _id
- :return: None or raises EngineException
+ :return: final_content or raises EngineException
"""
if not self.multiproject:
- return
+ return final_content
# Change public status
if session["public"] is not None:
- if session["public"] and "ANY" not in final_content["_admin"]["projects_read"]:
+ if (
+ session["public"]
+ and "ANY" not in final_content["_admin"]["projects_read"]
+ ):
final_content["_admin"]["projects_read"].append("ANY")
final_content["_admin"]["projects_write"].clear()
- if not session["public"] and "ANY" in final_content["_admin"]["projects_read"]:
+ if (
+ not session["public"]
+ and "ANY" in final_content["_admin"]["projects_read"]
+ ):
final_content["_admin"]["projects_read"].remove("ANY")
# Change project status
if p not in final_content["_admin"]["projects_read"]:
final_content["_admin"]["projects_read"].append(p)
+ return final_content
+
def check_unique_name(self, session, name, _id=None):
"""
Check that the name is unique for this project
_filter["name"] = name
if _id:
_filter["_id.neq"] = _id
- if self.db.get_one(self.topic, _filter, fail_on_empty=False, fail_on_more=False):
- raise EngineException("name '{}' already exists for {}".format(name, self.topic), HTTPStatus.CONFLICT)
+ if self.db.get_one(
+ self.topic, _filter, fail_on_empty=False, fail_on_more=False
+ ):
+ raise EngineException(
+ "name '{}' already exists for {}".format(name, self.topic),
+ HTTPStatus.CONFLICT,
+ )
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
def _send_msg(self, action, content, not_send_msg=None):
if self.topic_msg and not_send_msg is not False:
+ content = content.copy()
content.pop("_admin", None)
if isinstance(not_send_msg, list):
not_send_msg.append((self.topic_msg, action, content))
kitem_old = int(kitem)
# if index greater than list, extend the list
if kitem_old >= len(update_content):
- update_content += [None] * (kitem_old - len(update_content) + 1)
+ update_content += [None] * (
+ kitem_old - len(update_content) + 1
+ )
if not isinstance(update_content[kitem_old], (dict, list)):
update_content[kitem_old] = {}
else:
raise EngineException(
- "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
+ "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(
+ k, kitem
+ )
+ )
if v is None:
del update_content[kitem_old]
else:
update_content[kitem_old] = v if not yaml_format else safe_load(v)
except KeyError:
raise EngineException(
- "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
+ "Invalid query string '{}'. Descriptor does not contain '{}'".format(
+ k, kitem_old
+ )
+ )
except ValueError:
- raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
- k, kitem))
+ raise EngineException(
+ "Invalid query string '{}'. Expected integer index list instead of '{}'".format(
+ k, kitem
+ )
+ )
except IndexError:
raise EngineException(
- "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
+ "Invalid query string '{}'. Index '{}' out of range".format(
+ k, kitem_old
+ )
+ )
except YAMLError:
raise EngineException("Invalid query string '{}' yaml format".format(k))
- def show(self, session, _id):
+ def sol005_projection(self, data):
+ # Projection was moved to child classes
+ return data
+
+ def show(self, session, _id, filter_q=None, api_req=False):
"""
Get complete information on an topic
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: server internal id
+ :param filter_q: dict: query parameter
+ :param api_req: True if this call is serving an external API request. False if serving internal request.
:return: dictionary, raise exception if not found.
"""
if not self.multiproject:
filter_db = self._get_project_filter(session)
# To allow project&user addressing by name AS WELL AS _id
filter_db[BaseTopic.id_field(self.topic, _id)] = _id
- return self.db.get_one(self.topic, filter_db)
+ data = self.db.get_one(self.topic, filter_db)
+
+ # Only perform SOL005 projection if we are serving an external request
+ if api_req:
+ self.sol005_projection(data)
+
+ return data
+
# TODO transform data for SOL005 URL requests
# TODO remove _admin if not admin
:param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
:return: opened file or raises an exception
"""
- raise EngineException("Method get_file not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Method get_file not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR
+ )
- def list(self, session, filter_q=None):
+ def list(self, session, filter_q=None, api_req=False):
"""
Get a list of the topic that matches a filter
:param session: contains the used login username and working project
:param filter_q: filter of data to be applied
+ :param api_req: True if this call is serving an external API request. False if serving internal request.
:return: The list, it can be empty if no one match the filter.
"""
if not filter_q:
# TODO transform data for SOL005 URL requests. Transform filtering
# TODO implement "field-type" query string SOL005
- return self.db.get_list(self.topic, filter_q)
+ data = self.db.get_list(self.topic, filter_q)
+
+ # Only perform SOL005 projection if we are serving an external request
+ if api_req:
+ data = [self.sol005_projection(inst) for inst in data]
+
+ return data
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
"""
self._update_input_with_kwargs(content, kwargs)
content = self._validate_input_new(content, force=session["force"])
self.check_conflict_on_new(session, content)
- op_id = self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
+ op_id = self.format_on_new(
+ content, project_id=session["project_id"], make_public=session["public"]
+ )
_id = self.db.create(self.topic, content)
rollback.append({"topic": self.topic, "_id": _id})
if op_id:
:return: True package has is completely uploaded or False if partial content has been uplodaed.
Raise exception on error
"""
- raise EngineException("Method upload_content not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Method upload_content not valid for this topic",
+ HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
def delete_list(self, session, filter_q=None):
"""
self.check_conflict_on_del(session, _id, item_content)
if dry_run:
return None
-
+
if self.multiproject and session["project_id"]:
# remove reference from project_read if there are more projects referencing it. If it last one,
# do not remove reference, but delete
- other_projects_referencing = next((p for p in item_content["_admin"]["projects_read"]
- if p not in session["project_id"] and p != "ANY"), None)
+ other_projects_referencing = next(
+ (
+ p
+ for p in item_content["_admin"]["projects_read"]
+ if p not in session["project_id"] and p != "ANY"
+ ),
+ None,
+ )
# check if there are projects referencing it (apart from ANY, that means, public)....
if other_projects_referencing:
# remove references but not delete
- update_dict_pull = {"_admin.projects_read": session["project_id"],
- "_admin.projects_write": session["project_id"]}
- self.db.set_one(self.topic, filter_q, update_dict=None, pull_list=update_dict_pull)
+ update_dict_pull = {
+ "_admin.projects_read": session["project_id"],
+ "_admin.projects_write": session["project_id"],
+ }
+ self.db.set_one(
+ self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
+ )
return None
else:
- can_write = next((p for p in item_content["_admin"]["projects_write"] if p == "ANY" or
- p in session["project_id"]), None)
+ can_write = next(
+ (
+ p
+ for p in item_content["_admin"]["projects_write"]
+ if p == "ANY" or p in session["project_id"]
+ ),
+ None,
+ )
if not can_write:
- raise EngineException("You have not write permission to delete it",
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise EngineException(
+ "You have not write permission to delete it",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
# delete
self.db.del_one(self.topic, filter_q)
self._update_input_with_kwargs(indata, kwargs)
try:
if indata and session.get("set_project"):
- raise EngineException("Cannot edit content and set to project (query string SET_PROJECT) at same time",
- HTTPStatus.UNPROCESSABLE_ENTITY)
- indata = self._validate_input_edit(indata, force=session["force"])
-
+ raise EngineException(
+ "Cannot edit content and set to project (query string SET_PROJECT) at same time",
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
# TODO self._check_edition(session, indata, _id, force)
if not content:
content = self.show(session, _id)
+ indata = self._validate_input_edit(indata, content, force=session["force"])
deep_update_rfc7396(content, indata)
# To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
_id = content.get("_id") or _id
- self.check_conflict_on_edit(session, content, indata, _id=_id)
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
op_id = self.format_on_edit(content, indata)
self.db.replace(self.topic, _id, content)