From: Pedro Escaleira Date: Thu, 15 Jun 2023 14:43:33 +0000 (+0200) Subject: Fix bug 2088 by validating helm-chart value on VNF X-Git-Tag: v12.0.8^0 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fv12.0;p=osm%2FNBI.git Fix bug 2088 by validating helm-chart value on VNF CVE-2022-35503 Change-Id: I1dbb51d60fe5eef7c29704a2c13f6d188751012d Signed-off-by: Pedro Escaleira --- diff --git a/osm_nbi/descriptor_topics.py b/osm_nbi/descriptor_topics.py index 8f3529c..2eb6e9d 100644 --- a/osm_nbi/descriptor_topics.py +++ b/osm_nbi/descriptor_topics.py @@ -20,6 +20,7 @@ import copy import os import shutil import functools +import re # import logging from deepdiff import DeepDiff @@ -46,11 +47,14 @@ from osm_nbi import utils __author__ = "Alfonso Tierno " +valid_helm_chart_re = re.compile( + r"^[a-z0-9]([-a-z0-9]*[a-z0-9]/)?([a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" +) + class DescriptorTopic(BaseTopic): def __init__(self, db, fs, msg, auth): - - BaseTopic.__init__(self, db, fs, msg, auth) + super().__init__(db, fs, msg, auth) def check_conflict_on_edit(self, session, final_content, edit_content, _id): final_content = super().check_conflict_on_edit( @@ -838,9 +842,23 @@ class VnfdTopic(DescriptorTopic): self.validate_internal_virtual_links(indata) self.validate_monitoring_params(indata) self.validate_scaling_group_descriptor(indata) + self.validate_helm_chart(indata) return indata + @staticmethod + def validate_helm_chart(indata): + kdus = indata.get("kdu", []) + for kdu in kdus: + helm_chart_value = kdu.get("helm-chart") + if not helm_chart_value: + continue + if not valid_helm_chart_re.match(helm_chart_value): + raise EngineException( + "helm-chart '{}' is not valid".format(helm_chart_value), + http_code=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + @staticmethod def validate_mgmt_interface_connection_point(indata): if not indata.get("vdu"): @@ -1360,7 +1378,7 @@ class NsdTopic(DescriptorTopic): topic_msg = "nsd" def __init__(self, db, fs, msg, auth): - DescriptorTopic.__init__(self, db, fs, msg, auth) + super().__init__(db, fs, msg, auth) def pyangbind_validation(self, item, data, force=False): if self._descriptor_data_is_in_old_format(data): diff --git a/osm_nbi/tests/test_descriptor_topics.py b/osm_nbi/tests/test_descriptor_topics.py index 147f3e7..00b18a5 100755 --- a/osm_nbi/tests/test_descriptor_topics.py +++ b/osm_nbi/tests/test_descriptor_topics.py @@ -27,11 +27,21 @@ from copy import deepcopy from time import time from osm_common import dbbase, fsbase, msgbase from osm_nbi import authconn -from osm_nbi.tests.test_pkg_descriptors import db_vnfds_text, db_nsds_text +from osm_nbi.tests.test_pkg_descriptors import ( + db_vnfds_text, + db_nsds_text, + vnfd_exploit_text, + vnfd_exploit_fixed_text, +) from osm_nbi.descriptor_topics import VnfdTopic, NsdTopic from osm_nbi.engine import EngineException from osm_common.dbbase import DbException import yaml +import tempfile +import collections +import collections.abc + +collections.MutableSequence = collections.abc.MutableSequence test_name = "test-user" db_vnfd_content = yaml.load(db_vnfds_text, Loader=yaml.Loader)[0] @@ -46,11 +56,23 @@ fake_session = { "public": False, "allow_show_user_project_role": True, } +UUID = "00000000-0000-0000-0000-000000000000" + + +def admin_value(): + return {"projects_read": []} + + +def setup_mock_fs(fs): + fs.path = "" + fs.get_params.return_value = {} + fs.file_exists.return_value = False + fs.file_open.side_effect = lambda path, mode: tempfile.TemporaryFile(mode="a+b") -def norm(str): +def norm(s: str): """Normalize string for checking""" - return " ".join(str.strip().split()).lower() + return " ".join(s.strip().split()).lower() def compare_desc(tc, d1, d2, k): @@ -94,7 +116,7 @@ class Test_VnfdTopic(TestCase): self.topic.check_quota = Mock(return_value=None) # skip quota @contextmanager - def assertNotRaises(self, exception_type): + def assertNotRaises(self, exception_type=Exception): try: yield None except exception_type: @@ -105,6 +127,142 @@ class Test_VnfdTopic(TestCase): new_desc = deepcopy(template) return old_desc, new_desc + def prepare_vnfd_creation(self): + setup_mock_fs(self.fs) + test_vnfd = deepcopy(db_vnfd_content) + did = db_vnfd_content["_id"] + self.db.create.return_value = did + self.db.get_one.side_effect = [ + {"_id": did, "_admin": deepcopy(db_vnfd_content["_admin"])}, + None, + ] + return did, test_vnfd + + def prepare_vnfd(self, vnfd_text): + setup_mock_fs(self.fs) + test_vnfd = yaml.safe_load(vnfd_text) + self.db.create.return_value = UUID + self.db.get_one.side_effect = [ + {"_id": UUID, "_admin": admin_value()}, + None, + ] + return UUID, test_vnfd + + def prepare_test_vnfd(self, test_vnfd): + del test_vnfd["_id"] + del test_vnfd["_admin"] + del test_vnfd["vdu"][0]["cloud-init-file"] + del test_vnfd["df"][0]["lcm-operations-configuration"]["operate-vnf-op-config"][ + "day1-2" + ][0]["execution-environment-list"][0]["juju"] + return test_vnfd + + @patch("osm_nbi.descriptor_topics.shutil") + @patch("osm_nbi.descriptor_topics.os.rename") + def test_new_vnfd_normal_creation(self, mock_rename, mock_shutil): + did, test_vnfd = self.prepare_vnfd_creation() + test_vnfd = self.prepare_test_vnfd(test_vnfd) + rollback = [] + did2, oid = self.topic.new(rollback, fake_session, {}) + db_args = self.db.create.call_args[0] + msg_args = self.msg.write.call_args[0] + + self.assertEqual(len(rollback), 1, "Wrong rollback length") + self.assertEqual(msg_args[0], self.topic.topic_msg, "Wrong message topic") + self.assertEqual(msg_args[1], "created", "Wrong message action") + self.assertEqual(msg_args[2], {"_id": did}, "Wrong message content") + self.assertEqual(db_args[0], self.topic.topic, "Wrong DB topic") + self.assertEqual(did2, did, "Wrong DB VNFD id") + self.assertIsNotNone(db_args[1]["_admin"]["created"], "Wrong creation time") + self.assertEqual( + db_args[1]["_admin"]["modified"], + db_args[1]["_admin"]["created"], + "Wrong modification time", + ) + self.assertEqual( + db_args[1]["_admin"]["projects_read"], + [test_pid], + "Wrong read-only project list", + ) + self.assertEqual( + db_args[1]["_admin"]["projects_write"], + [test_pid], + "Wrong read-write project list", + ) + + self.db.get_one.side_effect = [ + {"_id": did, "_admin": deepcopy(db_vnfd_content["_admin"])}, + None, + ] + + self.topic.upload_content( + fake_session, did, test_vnfd, {}, {"Content-Type": []} + ) + msg_args = self.msg.write.call_args[0] + test_vnfd["_id"] = did + self.assertEqual(msg_args[0], self.topic.topic_msg, "Wrong message topic") + self.assertEqual(msg_args[1], "edited", "Wrong message action") + self.assertEqual(msg_args[2], test_vnfd, "Wrong message content") + + db_args = self.db.get_one.mock_calls[0][1] + self.assertEqual(db_args[0], self.topic.topic, "Wrong DB topic") + self.assertEqual(db_args[1]["_id"], did, "Wrong DB VNFD id") + + db_args = self.db.replace.call_args[0] + self.assertEqual(db_args[0], self.topic.topic, "Wrong DB topic") + self.assertEqual(db_args[1], did, "Wrong DB VNFD id") + + admin = db_args[2]["_admin"] + db_admin = deepcopy(db_vnfd_content["_admin"]) + self.assertEqual(admin["type"], "vnfd", "Wrong descriptor type") + self.assertEqual(admin["created"], db_admin["created"], "Wrong creation time") + self.assertGreater( + admin["modified"], db_admin["created"], "Wrong modification time" + ) + self.assertEqual( + admin["projects_read"], + db_admin["projects_read"], + "Wrong read-only project list", + ) + self.assertEqual( + admin["projects_write"], + db_admin["projects_write"], + "Wrong read-write project list", + ) + self.assertEqual( + admin["onboardingState"], "ONBOARDED", "Wrong onboarding state" + ) + self.assertEqual( + admin["operationalState"], "ENABLED", "Wrong operational state" + ) + self.assertEqual(admin["usageState"], "NOT_IN_USE", "Wrong usage state") + + storage = admin["storage"] + self.assertEqual(storage["folder"], did + ":1", "Wrong storage folder") + self.assertEqual(storage["descriptor"], "package", "Wrong storage descriptor") + self.assertEqual(admin["revision"], 1, "Wrong revision number") + compare_desc(self, test_vnfd, db_args[2], "VNFD") + + @patch("osm_nbi.descriptor_topics.shutil") + @patch("osm_nbi.descriptor_topics.os.rename") + def test_new_vnfd_exploit(self, mock_rename, mock_shutil): + id, test_vnfd = self.prepare_vnfd(vnfd_exploit_text) + + with self.assertRaises(EngineException): + self.topic.upload_content( + fake_session, id, test_vnfd, {}, {"Content-Type": []} + ) + + @patch("osm_nbi.descriptor_topics.shutil") + @patch("osm_nbi.descriptor_topics.os.rename") + def test_new_vnfd_valid_helm_chart(self, mock_rename, mock_shutil): + id, test_vnfd = self.prepare_vnfd(vnfd_exploit_fixed_text) + + with self.assertNotRaises(): + self.topic.upload_content( + fake_session, id, test_vnfd, {}, {"Content-Type": []} + ) + @patch("osm_nbi.descriptor_topics.shutil") @patch("osm_nbi.descriptor_topics.os.rename") def test_new_vnfd(self, mock_rename, mock_shutil): @@ -1323,8 +1481,8 @@ class Test_NsdTopic(TestCase): did = db_nsd_content["_id"] self.fs.get_params.return_value = {} self.fs.file_exists.return_value = False - self.fs.file_open.side_effect = lambda path, mode: open( - "/tmp/" + str(uuid4()), "a+b" + self.fs.file_open.side_effect = lambda path, mode: tempfile.TemporaryFile( + mode="a+b" ) test_nsd = deepcopy(db_nsd_content) del test_nsd["_id"] diff --git a/osm_nbi/tests/test_pkg_descriptors.py b/osm_nbi/tests/test_pkg_descriptors.py index 91e5641..39c28fe 100644 --- a/osm_nbi/tests/test_pkg_descriptors.py +++ b/osm_nbi/tests/test_pkg_descriptors.py @@ -20,6 +20,30 @@ __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com" __date__ = "2019-11-20" + +# Exploit exists in the key kdu.helm-chart +vnfd_exploit_text = """ + _id: 00000000-0000-0000-0000-000000000000 + id: n2vc-rce_vnfd + df: + - id: default-df + kdu: + - name: exploit + helm-chart: "local/exploit --post-renderer /bin/bash" + helm-version: v3 +""" + +# Exploit in kdu.helm-chart is fixed +vnfd_exploit_fixed_text = """ + id: n2vc-rce_vnfd + df: + - id: default-df + kdu: + - name: exploit + helm-chart: "local/exploit" + helm-version: v3 +""" + db_vnfds_text = """ --- - _admin: @@ -49,7 +73,7 @@ db_vnfds_text = """ product-name: hackfest3charmed-vnf version: '1.0' mgmt-cp: vnf-mgmt-ext - + virtual-compute-desc: - id: mgmt-compute virtual-cpu: @@ -61,17 +85,17 @@ db_vnfds_text = """ num-virtual-cpu: 2 virtual-memory: size: '2' - + virtual-storage-desc: - id: mgmt-storage size-of-storage: '20' - id: data-storage size-of-storage: '20' - + sw-image-desc: - id: hackfest3-mgmt name: hackfest3-mgmt - + vdu: - id: mgmtVM name: mgmtVM @@ -118,10 +142,10 @@ db_vnfds_text = """ - id: dataVM_cpu_util name: dataVM_cpu_util performance-metric: cpu_utilization - + int-virtual-link-desc: - id: internal - + ext-cpd: - id: vnf-mgmt-ext int-cpd: # Connection to int-cpd @@ -131,7 +155,7 @@ db_vnfds_text = """ int-cpd: # Connection to int-cpd vdu-id: dataVM cpd: vnf-data - + df: - id: hackfest_default vdu-profile: