Addition of PaaS
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
index 8124ce4..ae50198 100755 (executable)
@@ -32,6 +32,7 @@ from osm_nbi.admin_topics import (
     UserTopicAuth,
     CommonVimWimSdn,
     VcaTopic,
+    PaasTopic,
 )
 from osm_nbi.engine import EngineException
 from osm_nbi.authconn import AuthconnNotFoundException
@@ -165,6 +166,105 @@ class TestVcaTopic(TestCase):
         mock_check_conflict_on_del.assert_not_called()
 
 
+class TestPaaSTopic(TestCase):
+    def setUp(self):
+        self.db = Mock(dbbase.DbBase())
+        self.fs = Mock(fsbase.FsBase())
+        self.msg = Mock(msgbase.MsgBase())
+        self.auth = Mock(authconn.Authconn(None, None, None))
+        self.paas_topic = PaasTopic(self.db, self.fs, self.msg, self.auth)
+
+    def test_format_on_new(self):
+        content = {
+            "_id": "id",
+            "secret": "secret_to_encrypt",
+        }
+        self.db.encrypt.side_effect = ["encrypted_secret"]
+
+        expecte_oid = "id:0"
+        expected_num_operations = 1
+        oid = self.paas_topic.format_on_new(content)
+
+        self.assertEqual(oid, expecte_oid)
+        self.assertEqual(content["secret"], "encrypted_secret")
+        self.assertEqual(content["_admin"]["operationalState"], "PROCESSING")
+        self.assertEqual(content["_admin"]["current_operation"], None)
+        self.assertEqual(len(content["_admin"]["operations"]), expected_num_operations)
+        self.assertEqual(
+            content["_admin"]["operations"][0]["lcmOperationType"], "create"
+        )
+        self.db.encrypt.assert_called_with(
+            "secret_to_encrypt", schema_version="1.11", salt="id"
+        )
+
+    @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
+    def test_check_conflict_on_new(self, mock_get_project_filter):
+        indata = {"name": "new_paas_name"}
+        session = {}
+        mock_get_project_filter.return_value = {}
+        self.db.get_one.return_value = None
+        self.paas_topic.check_conflict_on_new(session, indata)
+
+    @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
+    def test_check_conflict_on_new_raise_exception(self, mock_get_project_filter):
+        indata = {"name": "new_paas_name"}
+        session = {}
+        mock_get_project_filter.return_value = {}
+        self.db.get_one.return_value = ["Found_PaaS"]
+        with self.assertRaises(EngineException):
+            self.paas_topic.check_conflict_on_new(session, indata)
+
+    @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
+    def test_check_conflict_on_edit(self, mock_get_project_filter):
+        edit_content = {"name": "new_paas_name"}
+        final_content = {}
+        session = {"force": None}
+        mock_get_project_filter.return_value = {}
+        self.db.get_one.return_value = None
+        self.paas_topic.check_conflict_on_edit(
+            session, final_content, edit_content, "id"
+        )
+
+    @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
+    def test_check_conflict_on_edit_raise_exception(self, mock_get_project_filter):
+        edit_content = {"name": "new_paas_name"}
+        final_content = {}
+        session = {"force": None}
+        mock_get_project_filter.return_value = {}
+        self.db.get_one.return_value = ["Found_PaaS"]
+        with self.assertRaises(EngineException):
+            self.paas_topic.check_conflict_on_edit(
+                session, final_content, edit_content, "id"
+            )
+
+    def test_format_on_edit(self):
+        edit_content = {
+            "_id": "id",
+            "secret": "secret_to_encrypt",
+        }
+        final_content = {
+            "_id": "id",
+            "_admin": {"operations": [{"lcmOperationType": "create"}]},
+            "schema_version": "1.11",
+        }
+        self.db.encrypt.side_effect = ["encrypted_secret"]
+        expected_oid = "id:1"
+        expected_num_operations = 2
+        print(self.paas_topic.password_to_encrypt)
+        oid = self.paas_topic.format_on_edit(final_content, edit_content)
+
+        self.assertEqual(oid, expected_oid)
+        self.assertEqual(final_content["secret"], "encrypted_secret")
+        self.assertEqual(
+            len(final_content["_admin"]["operations"]), expected_num_operations
+        )
+        self.assertEqual(final_content["_admin"]["operationalState"], "PROCESSING")
+        self.assertEqual(final_content["_admin"]["detailed-status"], "Editing")
+        self.db.encrypt.assert_called_with(
+            "secret_to_encrypt", schema_version="1.11", salt="id"
+        )
+
+
 class Test_ProjectTopicAuth(TestCase):
     @classmethod
     def setUpClass(cls):
@@ -1013,7 +1113,9 @@ class Test_UserTopicAuth(TestCase):
                 },
             )
             content = self.auth.update_user.call_args[0][0]
-            self.assertEqual(content["old_password"], old_password, "Wrong old password")
+            self.assertEqual(
+                content["old_password"], old_password, "Wrong old password"
+            )
             self.assertEqual(content["password"], new_pasw, "Wrong user password")
 
     def test_delete_user(self):