Fix bug 2088 by validating helm-chart value on VNF
[osm/NBI.git] / osm_nbi / tests / test_descriptor_topics.py
index 280c93f..00b18a5 100755 (executable)
@@ -17,6 +17,7 @@
 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
 __date__ = "2019-11-20"
 
+from contextlib import contextmanager
 import unittest
 from unittest import TestCase
 from unittest.mock import Mock, patch
@@ -26,11 +27,21 @@ from copy import deepcopy
 from time import time
 from osm_common import dbbase, fsbase, msgbase
 from osm_nbi import authconn
-from osm_nbi.tests.test_pkg_descriptors import db_vnfds_text, db_nsds_text
+from osm_nbi.tests.test_pkg_descriptors import (
+    db_vnfds_text,
+    db_nsds_text,
+    vnfd_exploit_text,
+    vnfd_exploit_fixed_text,
+)
 from osm_nbi.descriptor_topics import VnfdTopic, NsdTopic
 from osm_nbi.engine import EngineException
 from osm_common.dbbase import DbException
 import yaml
+import tempfile
+import collections
+import collections.abc
+
+collections.MutableSequence = collections.abc.MutableSequence
 
 test_name = "test-user"
 db_vnfd_content = yaml.load(db_vnfds_text, Loader=yaml.Loader)[0]
@@ -45,11 +56,23 @@ fake_session = {
     "public": False,
     "allow_show_user_project_role": True,
 }
+UUID = "00000000-0000-0000-0000-000000000000"
+
+
+def admin_value():
+    return {"projects_read": []}
+
 
+def setup_mock_fs(fs):
+    fs.path = ""
+    fs.get_params.return_value = {}
+    fs.file_exists.return_value = False
+    fs.file_open.side_effect = lambda path, mode: tempfile.TemporaryFile(mode="a+b")
 
-def norm(str):
+
+def norm(s: str):
     """Normalize string for checking"""
-    return " ".join(str.strip().split()).lower()
+    return " ".join(s.strip().split()).lower()
 
 
 def compare_desc(tc, d1, d2, k):
@@ -92,6 +115,154 @@ class Test_VnfdTopic(TestCase):
         self.topic = VnfdTopic(self.db, self.fs, self.msg, self.auth)
         self.topic.check_quota = Mock(return_value=None)  # skip quota
 
+    @contextmanager
+    def assertNotRaises(self, exception_type=Exception):
+        try:
+            yield None
+        except exception_type:
+            raise self.failureException("{} raised".format(exception_type.__name__))
+
+    def create_desc_temp(self, template):
+        old_desc = deepcopy(template)
+        new_desc = deepcopy(template)
+        return old_desc, new_desc
+
+    def prepare_vnfd_creation(self):
+        setup_mock_fs(self.fs)
+        test_vnfd = deepcopy(db_vnfd_content)
+        did = db_vnfd_content["_id"]
+        self.db.create.return_value = did
+        self.db.get_one.side_effect = [
+            {"_id": did, "_admin": deepcopy(db_vnfd_content["_admin"])},
+            None,
+        ]
+        return did, test_vnfd
+
+    def prepare_vnfd(self, vnfd_text):
+        setup_mock_fs(self.fs)
+        test_vnfd = yaml.safe_load(vnfd_text)
+        self.db.create.return_value = UUID
+        self.db.get_one.side_effect = [
+            {"_id": UUID, "_admin": admin_value()},
+            None,
+        ]
+        return UUID, test_vnfd
+
+    def prepare_test_vnfd(self, test_vnfd):
+        del test_vnfd["_id"]
+        del test_vnfd["_admin"]
+        del test_vnfd["vdu"][0]["cloud-init-file"]
+        del test_vnfd["df"][0]["lcm-operations-configuration"]["operate-vnf-op-config"][
+            "day1-2"
+        ][0]["execution-environment-list"][0]["juju"]
+        return test_vnfd
+
+    @patch("osm_nbi.descriptor_topics.shutil")
+    @patch("osm_nbi.descriptor_topics.os.rename")
+    def test_new_vnfd_normal_creation(self, mock_rename, mock_shutil):
+        did, test_vnfd = self.prepare_vnfd_creation()
+        test_vnfd = self.prepare_test_vnfd(test_vnfd)
+        rollback = []
+        did2, oid = self.topic.new(rollback, fake_session, {})
+        db_args = self.db.create.call_args[0]
+        msg_args = self.msg.write.call_args[0]
+
+        self.assertEqual(len(rollback), 1, "Wrong rollback length")
+        self.assertEqual(msg_args[0], self.topic.topic_msg, "Wrong message topic")
+        self.assertEqual(msg_args[1], "created", "Wrong message action")
+        self.assertEqual(msg_args[2], {"_id": did}, "Wrong message content")
+        self.assertEqual(db_args[0], self.topic.topic, "Wrong DB topic")
+        self.assertEqual(did2, did, "Wrong DB VNFD id")
+        self.assertIsNotNone(db_args[1]["_admin"]["created"], "Wrong creation time")
+        self.assertEqual(
+            db_args[1]["_admin"]["modified"],
+            db_args[1]["_admin"]["created"],
+            "Wrong modification time",
+        )
+        self.assertEqual(
+            db_args[1]["_admin"]["projects_read"],
+            [test_pid],
+            "Wrong read-only project list",
+        )
+        self.assertEqual(
+            db_args[1]["_admin"]["projects_write"],
+            [test_pid],
+            "Wrong read-write project list",
+        )
+
+        self.db.get_one.side_effect = [
+            {"_id": did, "_admin": deepcopy(db_vnfd_content["_admin"])},
+            None,
+        ]
+
+        self.topic.upload_content(
+            fake_session, did, test_vnfd, {}, {"Content-Type": []}
+        )
+        msg_args = self.msg.write.call_args[0]
+        test_vnfd["_id"] = did
+        self.assertEqual(msg_args[0], self.topic.topic_msg, "Wrong message topic")
+        self.assertEqual(msg_args[1], "edited", "Wrong message action")
+        self.assertEqual(msg_args[2], test_vnfd, "Wrong message content")
+
+        db_args = self.db.get_one.mock_calls[0][1]
+        self.assertEqual(db_args[0], self.topic.topic, "Wrong DB topic")
+        self.assertEqual(db_args[1]["_id"], did, "Wrong DB VNFD id")
+
+        db_args = self.db.replace.call_args[0]
+        self.assertEqual(db_args[0], self.topic.topic, "Wrong DB topic")
+        self.assertEqual(db_args[1], did, "Wrong DB VNFD id")
+
+        admin = db_args[2]["_admin"]
+        db_admin = deepcopy(db_vnfd_content["_admin"])
+        self.assertEqual(admin["type"], "vnfd", "Wrong descriptor type")
+        self.assertEqual(admin["created"], db_admin["created"], "Wrong creation time")
+        self.assertGreater(
+            admin["modified"], db_admin["created"], "Wrong modification time"
+        )
+        self.assertEqual(
+            admin["projects_read"],
+            db_admin["projects_read"],
+            "Wrong read-only project list",
+        )
+        self.assertEqual(
+            admin["projects_write"],
+            db_admin["projects_write"],
+            "Wrong read-write project list",
+        )
+        self.assertEqual(
+            admin["onboardingState"], "ONBOARDED", "Wrong onboarding state"
+        )
+        self.assertEqual(
+            admin["operationalState"], "ENABLED", "Wrong operational state"
+        )
+        self.assertEqual(admin["usageState"], "NOT_IN_USE", "Wrong usage state")
+
+        storage = admin["storage"]
+        self.assertEqual(storage["folder"], did + ":1", "Wrong storage folder")
+        self.assertEqual(storage["descriptor"], "package", "Wrong storage descriptor")
+        self.assertEqual(admin["revision"], 1, "Wrong revision number")
+        compare_desc(self, test_vnfd, db_args[2], "VNFD")
+
+    @patch("osm_nbi.descriptor_topics.shutil")
+    @patch("osm_nbi.descriptor_topics.os.rename")
+    def test_new_vnfd_exploit(self, mock_rename, mock_shutil):
+        id, test_vnfd = self.prepare_vnfd(vnfd_exploit_text)
+
+        with self.assertRaises(EngineException):
+            self.topic.upload_content(
+                fake_session, id, test_vnfd, {}, {"Content-Type": []}
+            )
+
+    @patch("osm_nbi.descriptor_topics.shutil")
+    @patch("osm_nbi.descriptor_topics.os.rename")
+    def test_new_vnfd_valid_helm_chart(self, mock_rename, mock_shutil):
+        id, test_vnfd = self.prepare_vnfd(vnfd_exploit_fixed_text)
+
+        with self.assertNotRaises():
+            self.topic.upload_content(
+                fake_session, id, test_vnfd, {}, {"Content-Type": []}
+            )
+
     @patch("osm_nbi.descriptor_topics.shutil")
     @patch("osm_nbi.descriptor_topics.os.rename")
     def test_new_vnfd(self, mock_rename, mock_shutil):
@@ -189,7 +360,7 @@ class Test_VnfdTopic(TestCase):
                 )
                 self.assertEqual(admin["usageState"], "NOT_IN_USE", "Wrong usage state")
                 storage = admin["storage"]
-                self.assertEqual(storage["folder"], did, "Wrong storage folder")
+                self.assertEqual(storage["folder"], did + ":1", "Wrong storage folder")
                 self.assertEqual(
                     storage["descriptor"], "package", "Wrong storage descriptor"
                 )
@@ -294,7 +465,6 @@ class Test_VnfdTopic(TestCase):
                 self.topic.upload_content(
                     fake_session, did, test_vnfd, {}, {"Content-Type": []}
                 )
-            print(str(e.exception))
             self.assertEqual(
                 e.exception.http_code, HTTPStatus.BAD_REQUEST, "Wrong HTTP status code"
             )
@@ -802,6 +972,52 @@ class Test_VnfdTopic(TestCase):
             self.fs.file_delete.assert_not_called()
         return
 
+    @patch("osm_nbi.descriptor_topics.yaml")
+    def test_validate_descriptor_changes(self, mock_yaml):
+        descriptor_name = "test_descriptor"
+        self.fs.file_open.side_effect = lambda path, mode: open(
+            "/tmp/" + str(uuid4()), "a+b"
+        )
+        with self.subTest(i=1, t="VNFD has changes in day1-2 config primitive"):
+            old_vnfd, new_vnfd = self.create_desc_temp(db_vnfd_content)
+            new_vnfd["df"][0]["lcm-operations-configuration"]["operate-vnf-op-config"][
+                "day1-2"
+            ][0]["config-primitive"][0]["name"] = "new_action"
+            mock_yaml.load.side_effect = [old_vnfd, new_vnfd]
+            with self.assertNotRaises(EngineException):
+                self.topic._validate_descriptor_changes(
+                    descriptor_name, "/tmp/", "/tmp:1/"
+                )
+        with self.subTest(i=2, t="VNFD sw version changed"):
+            # old vnfd uses the default software version: 1.0
+            new_vnfd["software-version"] = "1.3"
+            new_vnfd["sw-image-desc"][0]["name"] = "new-image"
+            mock_yaml.load.side_effect = [old_vnfd, new_vnfd]
+            with self.assertNotRaises(EngineException):
+                self.topic._validate_descriptor_changes(
+                    descriptor_name, "/tmp/", "/tmp:1/"
+                )
+        with self.subTest(
+            i=3, t="VNFD sw version is not changed and mgmt-cp has changed"
+        ):
+            old_vnfd, new_vnfd = self.create_desc_temp(db_vnfd_content)
+            new_vnfd["mgmt-cp"] = "new-mgmt-cp"
+            mock_yaml.load.side_effect = [old_vnfd, new_vnfd]
+            with self.assertRaises(EngineException) as e:
+                self.topic._validate_descriptor_changes(
+                    descriptor_name, "/tmp/", "/tmp:1/"
+                )
+            self.assertEqual(
+                e.exception.http_code,
+                HTTPStatus.UNPROCESSABLE_ENTITY,
+                "Wrong HTTP status code",
+            )
+            self.assertIn(
+                norm("there are disallowed changes in the vnf descriptor"),
+                norm(str(e.exception)),
+                "Wrong exception text",
+            )
+
     def test_validate_mgmt_interface_connection_point_on_valid_descriptor(self):
         indata = deepcopy(db_vnfd_content)
         self.topic.validate_mgmt_interface_connection_point(indata)
@@ -1247,14 +1463,26 @@ class Test_NsdTopic(TestCase):
         self.topic = NsdTopic(self.db, self.fs, self.msg, self.auth)
         self.topic.check_quota = Mock(return_value=None)  # skip quota
 
+    @contextmanager
+    def assertNotRaises(self, exception_type):
+        try:
+            yield None
+        except exception_type:
+            raise self.failureException("{} raised".format(exception_type.__name__))
+
+    def create_desc_temp(self, template):
+        old_desc = deepcopy(template)
+        new_desc = deepcopy(template)
+        return old_desc, new_desc
+
     @patch("osm_nbi.descriptor_topics.shutil")
     @patch("osm_nbi.descriptor_topics.os.rename")
     def test_new_nsd(self, mock_rename, mock_shutil):
         did = db_nsd_content["_id"]
         self.fs.get_params.return_value = {}
         self.fs.file_exists.return_value = False
-        self.fs.file_open.side_effect = lambda path, mode: open(
-            "/tmp/" + str(uuid4()), "a+b"
+        self.fs.file_open.side_effect = lambda path, mode: tempfile.TemporaryFile(
+            mode="a+b"
         )
         test_nsd = deepcopy(db_nsd_content)
         del test_nsd["_id"]
@@ -1336,7 +1564,7 @@ class Test_NsdTopic(TestCase):
                 )
                 self.assertEqual(admin["usageState"], "NOT_IN_USE", "Wrong usage state")
                 storage = admin["storage"]
-                self.assertEqual(storage["folder"], did, "Wrong storage folder")
+                self.assertEqual(storage["folder"], did + ":1", "Wrong storage folder")
                 self.assertEqual(
                     storage["descriptor"], "package", "Wrong storage descriptor"
                 )
@@ -1732,6 +1960,46 @@ class Test_NsdTopic(TestCase):
             self.fs.file_delete.assert_not_called()
         return
 
+    @patch("osm_nbi.descriptor_topics.yaml")
+    def test_validate_descriptor_changes(self, mock_yaml):
+        descriptor_name = "test_ns_descriptor"
+        self.fs.file_open.side_effect = lambda path, mode: open(
+            "/tmp/" + str(uuid4()), "a+b"
+        )
+        with self.subTest(
+            i=1, t="NSD has changes in ns-configuration:config-primitive"
+        ):
+            old_nsd, new_nsd = self.create_desc_temp(db_nsd_content)
+            old_nsd.update(
+                {"ns-configuration": {"config-primitive": [{"name": "add-user"}]}}
+            )
+            new_nsd.update(
+                {"ns-configuration": {"config-primitive": [{"name": "del-user"}]}}
+            )
+            mock_yaml.load.side_effect = [old_nsd, new_nsd]
+            with self.assertNotRaises(EngineException):
+                self.topic._validate_descriptor_changes(
+                    descriptor_name, "/tmp", "/tmp:1"
+                )
+        with self.subTest(i=2, t="NSD name has changed"):
+            old_nsd, new_nsd = self.create_desc_temp(db_nsd_content)
+            new_nsd["name"] = "nscharm-ns2"
+            mock_yaml.load.side_effect = [old_nsd, new_nsd]
+            with self.assertRaises(EngineException) as e:
+                self.topic._validate_descriptor_changes(
+                    descriptor_name, "/tmp", "/tmp:1"
+                )
+            self.assertEqual(
+                e.exception.http_code,
+                HTTPStatus.UNPROCESSABLE_ENTITY,
+                "Wrong HTTP status code",
+            )
+            self.assertIn(
+                norm("there are disallowed changes in the ns descriptor"),
+                norm(str(e.exception)),
+                "Wrong exception text",
+            )
+
     def test_validate_vld_mgmt_network_with_virtual_link_protocol_data_on_valid_descriptor(
         self,
     ):