X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fadmin_topics.py;h=3c3249c62000c2bb14181c87080dbaf2c7d2d598;hp=f831b6301a8ac21d3e884846af3a5778c694aaa2;hb=d010e3e44d9b06012dc741aca98054e3a87c5f22;hpb=20e74d260242c91c9836efba6a9436a159c4decc diff --git a/osm_nbi/admin_topics.py b/osm_nbi/admin_topics.py index f831b63..3c3249c 100644 --- a/osm_nbi/admin_topics.py +++ b/osm_nbi/admin_topics.py @@ -420,8 +420,8 @@ class SdnTopic(CommonVimWimSdn): input = super()._validate_input_new(input, force) return self._obtain_url(input, True) - def _validate_input_edit(self, input, force=False): - input = super()._validate_input_edit(input, force) + def _validate_input_edit(self, input, content, force=False): + input = super()._validate_input_edit(input, content, force) return self._obtain_url(input, False) @@ -679,12 +679,13 @@ class UserTopicAuth(UserTopic): except ValidationError as e: raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - def show(self, session, _id): + def show(self, session, _id, api_req=False): """ Get complete information on an topic :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param _id: server internal id or username + :param api_req: True if this call is serving an external API request. False if serving internal request. :return: dictionary, raise exception if not found. """ # Allow _id to be a name or uuid @@ -715,10 +716,9 @@ class UserTopicAuth(UserTopic): if kwargs: BaseTopic._update_input_with_kwargs(indata, kwargs) try: - indata = self._validate_input_edit(indata, force=session["force"]) - if not content: content = self.show(session, _id) + indata = self._validate_input_edit(indata, content, force=session["force"]) self.check_conflict_on_edit(session, content, indata, _id=_id) # self.format_on_edit(content, indata) @@ -813,11 +813,12 @@ class UserTopicAuth(UserTopic): except ValidationError as e: raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - def list(self, session, filter_q=None): + def list(self, session, filter_q=None, api_req=False): """ Get a list of the topic that matches a filter :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param filter_q: filter of data to be applied + :param api_req: True if this call is serving an external API request. False if serving internal request. :return: The list, it can be empty if no one match the filter. """ user_list = self.auth.get_user_list(filter_q) @@ -964,12 +965,13 @@ class ProjectTopicAuth(ProjectTopic): except ValidationError as e: raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - def show(self, session, _id): + def show(self, session, _id, api_req=False): """ Get complete information on an topic :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param _id: server internal id + :param api_req: True if this call is serving an external API request. False if serving internal request. :return: dictionary, raise exception if not found. """ # Allow _id to be a name or uuid @@ -983,7 +985,7 @@ class ProjectTopicAuth(ProjectTopic): else: raise EngineException("Project not found", HTTPStatus.NOT_FOUND) - def list(self, session, filter_q=None): + def list(self, session, filter_q=None, api_req=False): """ Get a list of the topic that matches a filter @@ -1035,10 +1037,9 @@ class ProjectTopicAuth(ProjectTopic): if kwargs: BaseTopic._update_input_with_kwargs(indata, kwargs) try: - indata = self._validate_input_edit(indata, force=session["force"]) - if not content: content = self.show(session, _id) + indata = self._validate_input_edit(indata, content, force=session["force"]) self.check_conflict_on_edit(session, content, indata, _id=_id) self.format_on_edit(content, indata) @@ -1080,9 +1081,9 @@ class RoleTopicAuth(BaseTopic): if role_def[-1] == ":": raise ValidationError("Operation cannot end with ':'") - role_def_matches = [op for op in operations if op.startswith(role_def)] + match = next((op for op in operations if op == role_def or op.startswith(role_def + ":")), None) - if len(role_def_matches) == 0: + if not match: raise ValidationError("Invalid permission '{}'".format(role_def)) def _validate_input_new(self, input, force=False): @@ -1099,7 +1100,7 @@ class RoleTopicAuth(BaseTopic): return input - def _validate_input_edit(self, input, force=False): + def _validate_input_edit(self, input, content, force=False): """ Validates input user content for updating an entry. @@ -1233,12 +1234,13 @@ class RoleTopicAuth(BaseTopic): final_content["permissions"]["admin"] = False return None - def show(self, session, _id): + def show(self, session, _id, api_req=False): """ Get complete information on an topic :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param _id: server internal id + :param api_req: True if this call is serving an external API request. False if serving internal request. :return: dictionary, raise exception if not found. """ filter_q = {BaseTopic.id_field(self.topic, _id): _id} @@ -1250,7 +1252,7 @@ class RoleTopicAuth(BaseTopic): raise AuthconnConflictException("Found more than one role with filter {}".format(filter_q)) return roles[0] - def list(self, session, filter_q=None): + def list(self, session, filter_q=None, api_req=False): """ Get a list of the topic that matches a filter @@ -1335,9 +1337,9 @@ class RoleTopicAuth(BaseTopic): if kwargs: self._update_input_with_kwargs(indata, kwargs) try: - indata = self._validate_input_edit(indata, force=session["force"]) if not content: content = self.show(session, _id) + indata = self._validate_input_edit(indata, content, force=session["force"]) deep_update_rfc7396(content, indata) self.check_conflict_on_edit(session, content, indata, _id=_id) self.format_on_edit(content, indata)