vim_account_new_schema, vim_account_edit_schema, sdn_new_schema, sdn_edit_schema, \
wim_account_new_schema, wim_account_edit_schema, roles_new_schema, roles_edit_schema, \
k8scluster_new_schema, k8scluster_edit_schema, k8srepo_new_schema, k8srepo_edit_schema, \
+ vca_new_schema, vca_edit_schema, \
osmrepo_new_schema, osmrepo_edit_schema, \
validate_input, ValidationError, is_valid_uuid # To check that User/Project Names don't look like UUIDs
from osm_nbi.base_topic import BaseTopic, EngineException
if not session["force"] and edit_content.get("name"):
self.check_unique_name(session, edit_content["name"], _id=_id)
+ return final_content
+
def format_on_edit(self, final_content, edit_content):
"""
Modifies final_content inserting admin information upon edition
return oid
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- super(CommonVimWimSdn, self).check_conflict_on_edit(session, final_content, edit_content, _id)
- super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super(CommonVimWimSdn, self).check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
# Update Helm/Juju Repo lists
repos = {"helm-chart": [], "juju-bundle": []}
for proj in session.get("set_project", []):
if rlist not in final_content["_admin"]:
final_content["_admin"][rlist] = []
final_content["_admin"][rlist] += repos[k]
+ return final_content
def check_conflict_on_del(self, session, _id, db_content):
"""
super().check_conflict_on_del(session, _id, db_content)
+class VcaTopic(CommonVimWimSdn):
+ topic = "vca"
+ topic_msg = "vca"
+ schema_new = vca_new_schema
+ schema_edit = vca_edit_schema
+ multiproject = True
+ password_to_encrypt = None
+
+ def format_on_new(self, content, project_id=None, make_public=False):
+ oid = super().format_on_new(content, project_id, make_public)
+ content["schema_version"] = schema_version = "1.11"
+ for key in ["secret", "cacert"]:
+ content[key] = self.db.encrypt(
+ content[key],
+ schema_version=schema_version,
+ salt=content["_id"]
+ )
+ return oid
+
+ def format_on_edit(self, final_content, edit_content):
+ oid = super().format_on_edit(final_content, edit_content)
+ schema_version = final_content.get("schema_version")
+ for key in ["secret", "cacert"]:
+ if key in edit_content:
+ final_content[key] = self.db.encrypt(
+ edit_content[key],
+ schema_version=schema_version,
+ salt=final_content["_id"]
+ )
+ return oid
+
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check if deletion can be done because of dependencies if it is not force. To override
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: internal _id
+ :param db_content: The database content of this item _id
+ :return: None if ok or raises EngineException with the conflict
+ """
+ if session["force"]:
+ return
+ # check if used by VNF
+ filter_q = {"vca": _id}
+ if session["project_id"]:
+ filter_q["_admin.projects_read.cont"] = session["project_id"]
+ if self.db.get_list("vim_accounts", filter_q):
+ raise EngineException("There is at least one VIM account using this vca", http_code=HTTPStatus.CONFLICT)
+ super().check_conflict_on_del(session, _id, db_content)
+
+
class K8sRepoTopic(CommonVimWimSdn):
topic = "k8srepos"
topic_msg = "k8srepo"
raise EngineException("You cannot remove system_admin role from admin user",
http_code=HTTPStatus.FORBIDDEN)
+ return final_content
+
def check_conflict_on_del(self, session, _id, db_content):
"""
Check if deletion can be done because of dependencies if it is not force. To override
if not content:
content = self.show(session, _id)
indata = self._validate_input_edit(indata, content, force=session["force"])
- self.check_conflict_on_edit(session, content, indata, _id=_id)
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
# self.format_on_edit(content, indata)
if not ("password" in indata or "username" in indata or indata.get("remove_project_role_mappings") or
# Check that project name is not used, regardless keystone already checks this
if project_name and self.auth.get_project_list(filter_q={"name": project_name}):
raise EngineException("project '{}' is already used".format(project_name), HTTPStatus.CONFLICT)
+ return final_content
def check_conflict_on_del(self, session, _id, db_content):
"""
if not content:
content = self.show(session, _id)
indata = self._validate_input_edit(indata, content, force=session["force"])
- self.check_conflict_on_edit(session, content, indata, _id=_id)
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
self.format_on_edit(content, indata)
content_original = copy.deepcopy(content)
deep_update_rfc7396(content, indata)
if roles and roles[0][BaseTopic.id_field("roles", _id)] != _id:
raise EngineException("role name '{}' exists".format(role_name), HTTPStatus.CONFLICT)
+ return final_content
+
def check_conflict_on_del(self, session, _id, db_content):
"""
Check if deletion can be done because of dependencies if it is not force. To override
content = self.show(session, _id)
indata = self._validate_input_edit(indata, content, force=session["force"])
deep_update_rfc7396(content, indata)
- self.check_conflict_on_edit(session, content, indata, _id=_id)
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
self.format_on_edit(content, indata)
self.auth.update_role(content)
except ValidationError as e: