vim_account_new_schema, vim_account_edit_schema, sdn_new_schema, sdn_edit_schema, \
wim_account_new_schema, wim_account_edit_schema, roles_new_schema, roles_edit_schema, \
k8scluster_new_schema, k8scluster_edit_schema, k8srepo_new_schema, k8srepo_edit_schema, \
- validate_input, ValidationError, is_valid_uuid # To check that User/Project Names don't look like UUIDs
+ osmrepo_new_schema, osmrepo_edit_schema, \
+ validate_input, ValidationError, is_valid_uuid # To check that User/Project Names don't look like UUIDs
from osm_nbi.base_topic import BaseTopic, EngineException
from osm_nbi.authconn import AuthconnNotFoundException, AuthconnConflictException
from osm_common.dbbase import deep_update_rfc7396
if dry_run:
return None
- # remove reference from project_read. If not last delete
- if session["project_id"]:
- for project_id in session["project_id"]:
- if project_id in db_content["_admin"]["projects_read"]:
- db_content["_admin"]["projects_read"].remove(project_id)
- if project_id in db_content["_admin"]["projects_write"]:
- db_content["_admin"]["projects_write"].remove(project_id)
- else:
- db_content["_admin"]["projects_read"].clear()
- db_content["_admin"]["projects_write"].clear()
-
- update_dict = {"_admin.projects_read": db_content["_admin"]["projects_read"],
- "_admin.projects_write": db_content["_admin"]["projects_write"]
- }
-
- # check if there are projects referencing it (apart from ANY that means public)....
- if db_content["_admin"]["projects_read"] and (len(db_content["_admin"]["projects_read"]) > 1 or
- db_content["_admin"]["projects_read"][0] != "ANY"):
- self.db.set_one(self.topic, filter_q, update_dict=update_dict) # remove references but not delete
- return None
+ # remove reference from project_read if there are more projects referencing it. If it last one,
+ # do not remove reference, but order via kafka to delete it
+ if session["project_id"] and session["project_id"]:
+ other_projects_referencing = next((p for p in db_content["_admin"]["projects_read"]
+ if p not in session["project_id"] and p != "ANY"), None)
+
+ # check if there are projects referencing it (apart from ANY, that means, public)....
+ if other_projects_referencing:
+ # remove references but not delete
+ update_dict_pull = {"_admin.projects_read": session["project_id"],
+ "_admin.projects_write": session["project_id"]}
+ self.db.set_one(self.topic, filter_q, update_dict=None, pull_list=update_dict_pull)
+ return None
+ else:
+ can_write = next((p for p in db_content["_admin"]["projects_write"] if p == "ANY" or
+ p in session["project_id"]), None)
+ if not can_write:
+ raise EngineException("You have not write permission to delete it",
+ http_code=HTTPStatus.UNAUTHORIZED)
# It must be deleted
if session["force"]:
op_id = None
self._send_msg("deleted", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg)
else:
- update_dict["_admin.to_delete"] = True
+ update_dict = {"_admin.to_delete": True}
self.db.set_one(self.topic, {"_id": _id},
update_dict=update_dict,
push={"_admin.operations": self._create_operation("delete")}
class SdnTopic(CommonVimWimSdn):
topic = "sdns"
topic_msg = "sdn"
+ quota_name = "sdn_controllers"
schema_new = sdn_new_schema
schema_edit = sdn_edit_schema
multiproject = True
input = super()._validate_input_new(input, force)
return self._obtain_url(input, True)
- def _validate_input_edit(self, input, force=False):
- input = super()._validate_input_edit(input, force)
+ def _validate_input_edit(self, input, content, force=False):
+ input = super()._validate_input_edit(input, content, force)
return self._obtain_url(input, False)
final_content["_admin"][rlist] = []
final_content["_admin"][rlist] += repos[k]
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check if deletion can be done because of dependencies if it is not force. To override
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: internal _id
+ :param db_content: The database content of this item _id
+ :return: None if ok or raises EngineException with the conflict
+ """
+ if session["force"]:
+ return
+ # check if used by VNF
+ filter_q = {"kdur.k8s-cluster.id": _id}
+ if session["project_id"]:
+ filter_q["_admin.projects_read.cont"] = session["project_id"]
+ if self.db.get_list("vnfrs", filter_q):
+ raise EngineException("There is at least one VNF using this k8scluster", http_code=HTTPStatus.CONFLICT)
+ super().check_conflict_on_del(session, _id, db_content)
+
class K8sRepoTopic(CommonVimWimSdn):
topic = "k8srepos"
return oid
+class OsmRepoTopic(BaseTopic):
+ topic = "osmrepos"
+ topic_msg = "osmrepos"
+ schema_new = osmrepo_new_schema
+ schema_edit = osmrepo_edit_schema
+ multiproject = True
+ # TODO: Implement user/password
+
+
class UserTopicAuth(UserTopic):
# topic = "users"
# topic_msg = "users"
Get complete information on an topic
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
- :param _id: server internal id
+ :param _id: server internal id or username
:return: dictionary, raise exception if not found.
"""
# Allow _id to be a name or uuid
- filter_q = {self.id_field(self.topic, _id): _id}
+ filter_q = {"username": _id}
# users = self.auth.get_user_list(filter_q)
users = self.list(session, filter_q) # To allow default filtering (Bug 853)
if len(users) == 1:
return users[0]
elif len(users) > 1:
- raise EngineException("Too many users found", HTTPStatus.CONFLICT)
+ raise EngineException("Too many users found for '{}'".format(_id), HTTPStatus.CONFLICT)
else:
- raise EngineException("User not found", HTTPStatus.NOT_FOUND)
+ raise EngineException("User '{}' not found".format(_id), HTTPStatus.NOT_FOUND)
def edit(self, session, _id, indata=None, kwargs=None, content=None):
"""
if kwargs:
BaseTopic._update_input_with_kwargs(indata, kwargs)
try:
- indata = self._validate_input_edit(indata, force=session["force"])
-
if not content:
content = self.show(session, _id)
+ indata = self._validate_input_edit(indata, content, force=session["force"])
self.check_conflict_on_edit(session, content, indata, _id=_id)
# self.format_on_edit(content, indata)
if kwargs:
BaseTopic._update_input_with_kwargs(indata, kwargs)
try:
- indata = self._validate_input_edit(indata, force=session["force"])
-
if not content:
content = self.show(session, _id)
+ indata = self._validate_input_edit(indata, content, force=session["force"])
self.check_conflict_on_edit(session, content, indata, _id=_id)
self.format_on_edit(content, indata)
schema_edit = roles_edit_schema
multiproject = False
- def __init__(self, db, fs, msg, auth, ops):
+ def __init__(self, db, fs, msg, auth):
BaseTopic.__init__(self, db, fs, msg, auth)
# self.auth = auth
- self.operations = ops
+ self.operations = auth.role_permissions
# self.topic = "roles_operations" if isinstance(auth, AuthconnKeystone) else "roles"
@staticmethod
return input
- def _validate_input_edit(self, input, force=False):
+ def _validate_input_edit(self, input, content, force=False):
"""
Validates input user content for updating an entry.
if kwargs:
self._update_input_with_kwargs(indata, kwargs)
try:
- indata = self._validate_input_edit(indata, force=session["force"])
if not content:
content = self.show(session, _id)
+ indata = self._validate_input_edit(indata, content, force=session["force"])
deep_update_rfc7396(content, indata)
self.check_conflict_on_edit(session, content, indata, _id=_id)
self.format_on_edit(content, indata)