vim_account_new_schema, vim_account_edit_schema, sdn_new_schema, sdn_edit_schema, \
wim_account_new_schema, wim_account_edit_schema, roles_new_schema, roles_edit_schema, \
k8scluster_new_schema, k8scluster_edit_schema, k8srepo_new_schema, k8srepo_edit_schema, \
- validate_input, ValidationError, is_valid_uuid # To check that User/Project Names don't look like UUIDs
+ osmrepo_new_schema, osmrepo_edit_schema, \
+ validate_input, ValidationError, is_valid_uuid # To check that User/Project Names don't look like UUIDs
from osm_nbi.base_topic import BaseTopic, EngineException
from osm_nbi.authconn import AuthconnNotFoundException, AuthconnConflictException
from osm_common.dbbase import deep_update_rfc7396
# do not remove reference, but order via kafka to delete it
if session["project_id"] and session["project_id"]:
other_projects_referencing = next((p for p in db_content["_admin"]["projects_read"]
- if p not in session["project_id"]), None)
+ if p not in session["project_id"] and p != "ANY"), None)
# check if there are projects referencing it (apart from ANY, that means, public)....
if other_projects_referencing:
# remove references but not delete
- update_dict_pull = {"_admin.projects_read.{}".format(p): None for p in session["project_id"]}
- update_dict_pull.update({"_admin.projects_write.{}".format(p): None for p in session["project_id"]})
- self.db.set_one(self.topic, filter_q, update_dict=None, pull=update_dict_pull)
+ update_dict_pull = {"_admin.projects_read": session["project_id"],
+ "_admin.projects_write": session["project_id"]}
+ self.db.set_one(self.topic, filter_q, update_dict=None, pull_list=update_dict_pull)
return None
else:
can_write = next((p for p in db_content["_admin"]["projects_write"] if p == "ANY" or
class SdnTopic(CommonVimWimSdn):
topic = "sdns"
topic_msg = "sdn"
+ quota_name = "sdn_controllers"
schema_new = sdn_new_schema
schema_edit = sdn_edit_schema
multiproject = True
input = super()._validate_input_new(input, force)
return self._obtain_url(input, True)
- def _validate_input_edit(self, input, force=False):
- input = super()._validate_input_edit(input, force)
+ def _validate_input_edit(self, input, content, force=False):
+ input = super()._validate_input_edit(input, content, force)
return self._obtain_url(input, False)
final_content["_admin"][rlist] = []
final_content["_admin"][rlist] += repos[k]
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check if deletion can be done because of dependencies if it is not force. To override
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: internal _id
+ :param db_content: The database content of this item _id
+ :return: None if ok or raises EngineException with the conflict
+ """
+ if session["force"]:
+ return
+ # check if used by VNF
+ filter_q = {"kdur.k8s-cluster.id": _id}
+ if session["project_id"]:
+ filter_q["_admin.projects_read.cont"] = session["project_id"]
+ if self.db.get_list("vnfrs", filter_q):
+ raise EngineException("There is at least one VNF using this k8scluster", http_code=HTTPStatus.CONFLICT)
+ super().check_conflict_on_del(session, _id, db_content)
+
class K8sRepoTopic(CommonVimWimSdn):
topic = "k8srepos"
return oid
+class OsmRepoTopic(BaseTopic):
+ topic = "osmrepos"
+ topic_msg = "osmrepos"
+ schema_new = osmrepo_new_schema
+ schema_edit = osmrepo_edit_schema
+ multiproject = True
+ # TODO: Implement user/password
+
+
class UserTopicAuth(UserTopic):
# topic = "users"
# topic_msg = "users"
Get complete information on an topic
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
- :param _id: server internal id
+ :param _id: server internal id or username
:return: dictionary, raise exception if not found.
"""
# Allow _id to be a name or uuid
if len(users) == 1:
return users[0]
elif len(users) > 1:
- raise EngineException("Too many users found", HTTPStatus.CONFLICT)
+ raise EngineException("Too many users found for '{}'".format(_id), HTTPStatus.CONFLICT)
else:
- raise EngineException("User not found", HTTPStatus.NOT_FOUND)
+ raise EngineException("User '{}' not found".format(_id), HTTPStatus.NOT_FOUND)
def edit(self, session, _id, indata=None, kwargs=None, content=None):
"""
if kwargs:
BaseTopic._update_input_with_kwargs(indata, kwargs)
try:
- indata = self._validate_input_edit(indata, force=session["force"])
-
if not content:
content = self.show(session, _id)
+ indata = self._validate_input_edit(indata, content, force=session["force"])
self.check_conflict_on_edit(session, content, indata, _id=_id)
# self.format_on_edit(content, indata)
if kwargs:
BaseTopic._update_input_with_kwargs(indata, kwargs)
try:
- indata = self._validate_input_edit(indata, force=session["force"])
-
if not content:
content = self.show(session, _id)
+ indata = self._validate_input_edit(indata, content, force=session["force"])
self.check_conflict_on_edit(session, content, indata, _id=_id)
self.format_on_edit(content, indata)
return input
- def _validate_input_edit(self, input, force=False):
+ def _validate_input_edit(self, input, content, force=False):
"""
Validates input user content for updating an entry.
if kwargs:
self._update_input_with_kwargs(indata, kwargs)
try:
- indata = self._validate_input_edit(indata, force=session["force"])
if not content:
content = self.show(session, _id)
+ indata = self._validate_input_edit(indata, content, force=session["force"])
deep_update_rfc7396(content, indata)
self.check_conflict_on_edit(session, content, indata, _id=_id)
self.format_on_edit(content, indata)