fixing tests for additionalParams and NSI without osm running
[osm/NBI.git] / osm_nbi / admin_topics.py
index ea79cd4..76e4065 100644 (file)
@@ -23,6 +23,7 @@ from validation import vim_account_new_schema, vim_account_edit_schema, sdn_new_
 from validation import wim_account_new_schema, wim_account_edit_schema, roles_new_schema, roles_edit_schema
 from validation import validate_input
 from validation import ValidationError
 from validation import wim_account_new_schema, wim_account_edit_schema, roles_new_schema, roles_edit_schema
 from validation import validate_input
 from validation import ValidationError
+from validation import is_valid_uuid    # To check that User/Project Names don't look like UUIDs
 from base_topic import BaseTopic, EngineException
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 from base_topic import BaseTopic, EngineException
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
@@ -61,8 +62,10 @@ class UserTopic(BaseTopic):
             for p in indata["projects"]:
                 if p == "admin":
                     continue
             for p in indata["projects"]:
                 if p == "admin":
                     continue
-                if not self.db.get_one("projects", {"_id": p}, fail_on_empty=False, fail_on_more=False):
-                    raise EngineException("project '{}' does not exists".format(p), HTTPStatus.CONFLICT)
+                # To allow project addressing by Name as well as ID
+                if not self.db.get_one("projects", {BaseTopic.id_field("projects", p): p}, fail_on_empty=False,
+                                       fail_on_more=False):
+                    raise EngineException("project '{}' does not exist".format(p), HTTPStatus.CONFLICT)
 
     def check_conflict_on_del(self, session, _id, force=False):
         if _id == session["username"]:
 
     def check_conflict_on_del(self, session, _id, force=False):
         if _id == session["username"]:
@@ -71,7 +74,8 @@ class UserTopic(BaseTopic):
     @staticmethod
     def format_on_new(content, project_id=None, make_public=False):
         BaseTopic.format_on_new(content, make_public=False)
     @staticmethod
     def format_on_new(content, project_id=None, make_public=False):
         BaseTopic.format_on_new(content, make_public=False)
-        content["_id"] = content["username"]
+        # Removed so that the UUID is kept, to allow User Name modification
+        # content["_id"] = content["username"]
         salt = uuid4().hex
         content["_admin"]["salt"] = salt
         if content.get("password"):
         salt = uuid4().hex
         content["_admin"]["salt"] = salt
         if content.get("password"):
@@ -89,11 +93,21 @@ class UserTopic(BaseTopic):
     def edit(self, session, _id, indata=None, kwargs=None, force=False, content=None):
         if not session["admin"]:
             raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
     def edit(self, session, _id, indata=None, kwargs=None, force=False, content=None):
         if not session["admin"]:
             raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+        # Names that look like UUIDs are not allowed
+        name = (indata if indata else kwargs).get("username")
+        if is_valid_uuid(name):
+            raise EngineException("Usernames that look like UUIDs are not allowed",
+                                  http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
         return BaseTopic.edit(self, session, _id, indata=indata, kwargs=kwargs, force=force, content=content)
 
     def new(self, rollback, session, indata=None, kwargs=None, headers=None, force=False, make_public=False):
         if not session["admin"]:
             raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
         return BaseTopic.edit(self, session, _id, indata=indata, kwargs=kwargs, force=force, content=content)
 
     def new(self, rollback, session, indata=None, kwargs=None, headers=None, force=False, make_public=False):
         if not session["admin"]:
             raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+        # Names that look like UUIDs are not allowed
+        name = indata["username"] if indata else kwargs["username"]
+        if is_valid_uuid(name):
+            raise EngineException("Usernames that look like UUIDs are not allowed",
+                                  http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
         return BaseTopic.new(self, rollback, session, indata=indata, kwargs=kwargs, headers=headers, force=force,
                              make_public=make_public)
 
         return BaseTopic.new(self, rollback, session, indata=indata, kwargs=kwargs, headers=headers, force=force,
                              make_public=make_public)
 
@@ -117,7 +131,8 @@ class ProjectTopic(BaseTopic):
     @staticmethod
     def format_on_new(content, project_id=None, make_public=False):
         BaseTopic.format_on_new(content, None)
     @staticmethod
     def format_on_new(content, project_id=None, make_public=False):
         BaseTopic.format_on_new(content, None)
-        content["_id"] = content["name"]
+        # Removed so that the UUID is kept, to allow Project Name modification
+        # content["_id"] = content["name"]
 
     def check_conflict_on_del(self, session, _id, force=False):
         if _id == session["project_id"]:
 
     def check_conflict_on_del(self, session, _id, force=False):
         if _id == session["project_id"]:
@@ -131,11 +146,21 @@ class ProjectTopic(BaseTopic):
     def edit(self, session, _id, indata=None, kwargs=None, force=False, content=None):
         if not session["admin"]:
             raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
     def edit(self, session, _id, indata=None, kwargs=None, force=False, content=None):
         if not session["admin"]:
             raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+        # Names that look like UUIDs are not allowed
+        name = (indata if indata else kwargs).get("name")
+        if is_valid_uuid(name):
+            raise EngineException("Project names that look like UUIDs are not allowed",
+                                  http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
         return BaseTopic.edit(self, session, _id, indata=indata, kwargs=kwargs, force=force, content=content)
 
     def new(self, rollback, session, indata=None, kwargs=None, headers=None, force=False, make_public=False):
         if not session["admin"]:
             raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
         return BaseTopic.edit(self, session, _id, indata=indata, kwargs=kwargs, force=force, content=content)
 
     def new(self, rollback, session, indata=None, kwargs=None, headers=None, force=False, make_public=False):
         if not session["admin"]:
             raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+        # Names that look like UUIDs are not allowed
+        name = indata["name"] if indata else kwargs["name"]
+        if is_valid_uuid(name):
+            raise EngineException("Project names that look like UUIDs are not allowed",
+                                  http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
         return BaseTopic.new(self, rollback, session, indata=indata, kwargs=kwargs, headers=headers, force=force,
                              make_public=make_public)
 
         return BaseTopic.new(self, rollback, session, indata=indata, kwargs=kwargs, headers=headers, force=force,
                              make_public=make_public)
 
@@ -692,7 +717,7 @@ class RoleTopicAuth(BaseTopic):
             if role_def[-1] == ".":
                 raise ValidationError("Operation cannot end with \".\"")
             
             if role_def[-1] == ".":
                 raise ValidationError("Operation cannot end with \".\"")
             
-            role_def_matches = [op for op in operations if op.starswith(role_def)]
+            role_def_matches = [op for op in operations if op.startswith(role_def)]
 
             if len(role_def_matches) == 0:
                 raise ValidationError("No matching operation found.")
 
             if len(role_def_matches) == 0:
                 raise ValidationError("No matching operation found.")