More unit tests for descriptor_topics
[osm/NBI.git] / osm_nbi / admin_topics.py
index b41e457..ada9c7b 100644 (file)
@@ -21,6 +21,7 @@ from time import time
 from osm_nbi.validation import user_new_schema, user_edit_schema, project_new_schema, project_edit_schema, \
     vim_account_new_schema, vim_account_edit_schema, sdn_new_schema, sdn_edit_schema, \
     wim_account_new_schema, wim_account_edit_schema, roles_new_schema, roles_edit_schema, \
+    k8scluster_new_schema, k8scluster_edit_schema, k8srepo_new_schema, k8srepo_edit_schema, \
     validate_input, ValidationError, is_valid_uuid    # To check that User/Project Names don't look like UUIDs
 from osm_nbi.base_topic import BaseTopic, EngineException
 from osm_nbi.authconn import AuthconnNotFoundException, AuthconnConflictException
@@ -247,6 +248,7 @@ class CommonVimWimSdn(BaseTopic):
         :param edit_content: user requested update content
         :return: operation id
         """
+        super().format_on_edit(final_content, edit_content)
 
         # encrypt passwords
         schema_version = final_content.get("schema_version")
@@ -387,6 +389,40 @@ class SdnTopic(CommonVimWimSdn):
     config_to_encrypt = {}
 
 
+class K8sClusterTopic(CommonVimWimSdn):
+    topic = "k8sclusters"
+    topic_msg = "k8scluster"
+    schema_new = k8scluster_new_schema
+    schema_edit = k8scluster_edit_schema
+    multiproject = True
+    password_to_encrypt = None
+    config_to_encrypt = {}
+
+    def format_on_new(self, content, project_id=None, make_public=False):
+        oid = super().format_on_new(content, project_id, make_public)
+        self.db.encrypt_decrypt_fields(content["credentials"], 'encrypt', ['password', 'secret'],
+                                       schema_version=content["schema_version"], salt=content["_id"])
+        return oid
+
+    def format_on_edit(self, final_content, edit_content):
+        if final_content.get("schema_version") and edit_content.get("credentials"):
+            self.db.encrypt_decrypt_fields(edit_content["credentials"], 'encrypt', ['password', 'secret'],
+                                           schema_version=final_content["schema_version"], salt=final_content["_id"])
+            deep_update_rfc7396(final_content["credentials"], edit_content["credentials"])
+        oid = super().format_on_edit(final_content, edit_content)
+        return oid
+
+
+class K8sRepoTopic(CommonVimWimSdn):
+    topic = "k8srepos"
+    topic_msg = "k8srepo"
+    schema_new = k8srepo_new_schema
+    schema_edit = k8srepo_edit_schema
+    multiproject = True
+    password_to_encrypt = None
+    config_to_encrypt = {}
+
+
 class UserTopicAuth(UserTopic):
     # topic = "users"
     # topic_msg = "users"
@@ -744,7 +780,7 @@ class ProjectTopicAuth(ProjectTopic):
         project_name = edit_content.get("name")
         if project_name != final_content["name"]:  # It is a true renaming
             if is_valid_uuid(project_name):
-                raise EngineException("project name  '{}' cannot have an uuid format".format(project_name),
+                raise EngineException("project name '{}' cannot have an uuid format".format(project_name),
                                       HTTPStatus.UNPROCESSABLE_ENTITY)
 
             if final_content["name"] == "admin":
@@ -975,6 +1011,11 @@ class RoleTopicAuth(BaseTopic):
         :param indata: data to be inserted
         :return: None or raises EngineException
         """
+        # check name is not uuid
+        role_name = indata.get("name")
+        if is_valid_uuid(role_name):
+            raise EngineException("role name '{}' cannot have an uuid format".format(role_name),
+                                  HTTPStatus.UNPROCESSABLE_ENTITY)
         # check name not exists
         name = indata["name"]
         # if self.db.get_one(self.topic, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
@@ -996,6 +1037,17 @@ class RoleTopicAuth(BaseTopic):
         if "admin" not in final_content["permissions"]:
             final_content["permissions"]["admin"] = False
 
+        # check name is not uuid
+        role_name = edit_content.get("name")
+        if is_valid_uuid(role_name):
+            raise EngineException("role name '{}' cannot have an uuid format".format(role_name),
+                                  HTTPStatus.UNPROCESSABLE_ENTITY)
+
+        # Check renaming of admin roles
+        role = self.auth.get_role(_id)
+        if role["name"] in ["system_admin", "project_admin"]:
+            raise EngineException("You cannot rename role '{}'".format(role["name"]), http_code=HTTPStatus.FORBIDDEN)
+
         # check name not exists
         if "name" in edit_content:
             role_name = edit_content["name"]