X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fadmin_topics.py;h=071ed3b3d91e6b272fe7655ef1cfcae7da64997e;hp=cbe2f4c82e88c745bcdaa3f6646738c6d2de42e3;hb=refs%2Fchanges%2F66%2F7666%2F2;hpb=cf042d30e8b7a1a9cbd1b2064e83c5d20ffcec9b diff --git a/osm_nbi/admin_topics.py b/osm_nbi/admin_topics.py index cbe2f4c..071ed3b 100644 --- a/osm_nbi/admin_topics.py +++ b/osm_nbi/admin_topics.py @@ -858,7 +858,7 @@ class ProjectTopicAuth(ProjectTopic): class RoleTopicAuth(BaseTopic): topic = "roles_operations" - topic_msg = "roles" + topic_msg = None # "roles" schema_new = roles_new_schema schema_edit = roles_edit_schema multiproject = False @@ -878,25 +878,19 @@ class RoleTopicAuth(BaseTopic): :param role_definitions: role definition to test :return: None if ok, raises ValidationError exception on error """ - ignore_fields = ["_id", "_admin", "name"] - for role_def in role_definitions.keys(): + if not role_definitions.get("permissions"): + return + ignore_fields = ["admin", "default"] + for role_def in role_definitions["permissions"].keys(): if role_def in ignore_fields: continue - if role_def == "root": - if isinstance(role_definitions[role_def], bool): - continue - else: - raise ValidationError("Operation authorization \".\" should be True/False.") if role_def[-1] == ":": - raise ValidationError("Operation cannot end with \".\"") + raise ValidationError("Operation cannot end with ':'") role_def_matches = [op for op in operations if op.startswith(role_def)] if len(role_def_matches) == 0: - raise ValidationError("No matching operation found.") - - if not isinstance(role_definitions[role_def], bool): - raise ValidationError("Operation authorization {} should be True/False.".format(role_def)) + raise ValidationError("Invalid permission '{}'".format(role_def)) def _validate_input_new(self, input, force=False): """ @@ -934,11 +928,9 @@ class RoleTopicAuth(BaseTopic): :param indata: data to be inserted :return: None or raises EngineException """ - role = indata.get("name") - role_list = list(map(lambda x: x["name"], self.auth.get_role_list())) - - if role in role_list: - raise EngineException("role '{}' exists".format(role), HTTPStatus.CONFLICT) + # check name not exists + if self.db.get_one(self.topic, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): + raise EngineException("role name '{}' exists".format(indata["name"]), HTTPStatus.CONFLICT) def check_conflict_on_edit(self, session, final_content, edit_content, _id): """ @@ -950,12 +942,16 @@ class RoleTopicAuth(BaseTopic): :param _id: internal _id :return: None or raises EngineException """ - roles = self.auth.get_role_list() - system_admin_role = [role for role in roles - if role["name"] == "system_admin"][0] + if "default" not in final_content["permissions"]: + final_content["permissions"]["default"] = False + if "admin" not in final_content["permissions"]: + final_content["permissions"]["admin"] = False - if _id == system_admin_role["_id"]: - raise EngineException("You cannot edit system_admin role", http_code=HTTPStatus.FORBIDDEN) + # check name not exists + if "name" in edit_content: + role_name = edit_content["name"] + if self.db.get_one(self.topic, {"name": role_name, "_id.ne": _id}, fail_on_empty=False, fail_on_more=False): + raise EngineException("role name '{}' exists".format(role_name), HTTPStatus.CONFLICT) def check_conflict_on_del(self, session, _id, db_content): """ @@ -990,12 +986,13 @@ class RoleTopicAuth(BaseTopic): content["_admin"]["created"] = now content["_admin"]["modified"] = now - if ":" in content.keys(): - content["root"] = content[":"] - del content[":"] + if "permissions" not in content: + content["permissions"] = {} - if "root" not in content.keys(): - content["root"] = False + if "default" not in content["permissions"]: + content["permissions"]["default"] = False + if "admin" not in content["permissions"]: + content["permissions"]["admin"] = False @staticmethod def format_on_edit(final_content, edit_content): @@ -1008,78 +1005,69 @@ class RoleTopicAuth(BaseTopic): """ final_content["_admin"]["modified"] = time() - ignore_fields = ["_id", "name", "_admin"] - delete_keys = [key for key in final_content.keys() if key not in ignore_fields] - - for key in delete_keys: - del final_content[key] - - # Saving the role definition - for role_def, value in edit_content.items(): - final_content[role_def] = value - - if ":" in final_content.keys(): - final_content["root"] = final_content[":"] - del final_content[":"] - - if "root" not in final_content.keys(): - final_content["root"] = False - - @staticmethod - def format_on_show(content): - """ - Modifies the content of the role information to separate the role - metadata from the role definition. Eases the reading process of the - role definition. - - :param definition: role definition to be processed - """ - content["_id"] = str(content["_id"]) - - def show(self, session, _id): - """ - Get complete information on an topic - - :param session: contains "username", "admin", "force", "public", "project_id", "set_project" - :param _id: server internal id - :return: dictionary, raise exception if not found. - """ - filter_db = {"_id": _id} - - role = self.db.get_one(self.topic, filter_db) - new_role = dict(role) - self.format_on_show(new_role) - - return new_role - - def list(self, session, filter_q=None): - """ - Get a list of the topic that matches a filter - - :param session: contains "username", "admin", "force", "public", "project_id", "set_project" - :param filter_q: filter of data to be applied - :return: The list, it can be empty if no one match the filter. - """ - if not filter_q: - filter_q = {} - - if ":" in filter_q: - filter_q["root"] = filter_q[":"] - - for key in filter_q.keys(): - if key == "name": - continue - filter_q[key] = filter_q[key] in ["True", "true"] + if "permissions" not in final_content: + final_content["permissions"] = {} - roles = self.db.get_list(self.topic, filter_q) - new_roles = [] + if "default" not in final_content["permissions"]: + final_content["permissions"]["default"] = False + if "admin" not in final_content["permissions"]: + final_content["permissions"]["admin"] = False - for role in roles: - new_role = dict(role) - self.format_on_show(new_role) - new_roles.append(new_role) + # @staticmethod + # def format_on_show(content): + # """ + # Modifies the content of the role information to separate the role + # metadata from the role definition. Eases the reading process of the + # role definition. + # + # :param definition: role definition to be processed + # """ + # content["_id"] = str(content["_id"]) + # + # def show(self, session, _id): + # """ + # Get complete information on an topic + # + # :param session: contains "username", "admin", "force", "public", "project_id", "set_project" + # :param _id: server internal id + # :return: dictionary, raise exception if not found. + # """ + # filter_db = {"_id": _id} + # + # role = self.db.get_one(self.topic, filter_db) + # new_role = dict(role) + # self.format_on_show(new_role) + # + # return new_role - return new_roles + # def list(self, session, filter_q=None): + # """ + # Get a list of the topic that matches a filter + # + # :param session: contains "username", "admin", "force", "public", "project_id", "set_project" + # :param filter_q: filter of data to be applied + # :return: The list, it can be empty if no one match the filter. + # """ + # if not filter_q: + # filter_q = {} + # + # if ":" in filter_q: + # filter_q["root"] = filter_q[":"] + # + # for key in filter_q.keys(): + # if key == "name": + # continue + # filter_q[key] = filter_q[key] in ["True", "true"] + # + # roles = self.db.get_list(self.topic, filter_q) + # new_roles = [] + # + # for role in roles: + # new_role = dict(role) + # self.format_on_show(new_role) + # new_roles.append(new_role) + # + # return new_roles def new(self, rollback, session, indata=None, kwargs=None, headers=None): """ @@ -1093,16 +1081,16 @@ class RoleTopicAuth(BaseTopic): :return: _id: identity of the inserted data. """ try: - content = BaseTopic._remove_envelop(indata) + content = self._remove_envelop(indata) # Override descriptor with query string kwargs - BaseTopic._update_input_with_kwargs(content, kwargs) + self._update_input_with_kwargs(content, kwargs) content = self._validate_input_new(content, session["force"]) self.check_conflict_on_new(session, content) self.format_on_new(content, project_id=session["project_id"], make_public=session["public"]) role_name = content["name"] - role = self.auth.create_role(role_name) - content["_id"] = role["_id"] + role_id = self.auth.create_role(role_name) + content["_id"] = role_id _id = self.db.create(self.topic, content) rollback.append({"topic": self.topic, "_id": _id}) # self._send_msg("create", content) @@ -1138,19 +1126,6 @@ class RoleTopicAuth(BaseTopic): :param content: :return: _id: identity of the inserted data. """ - indata = self._remove_envelop(indata) - - # Override descriptor with query string kwargs - if kwargs: - self._update_input_with_kwargs(indata, kwargs) - try: - indata = self._validate_input_edit(indata, force=session["force"]) - - if not content: - content = self.show(session, _id) - self.check_conflict_on_edit(session, content, indata, _id=_id) - self.format_on_edit(content, indata) - self.db.replace(self.topic, _id, content) - return id - except ValidationError as e: - raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) + _id = super().edit(session, _id, indata, kwargs, content) + if indata.get("name"): + self.auth.update_role(_id, name=indata.get("name"))