Fix bug 2088 by validating helm-chart value on VNF
CVE-2022-35503
Change-Id: I1dbb51d60fe5eef7c29704a2c13f6d188751012d
Signed-off-by: Pedro Escaleira <escaleira@av.it.pt>
diff --git a/osm_nbi/descriptor_topics.py b/osm_nbi/descriptor_topics.py
index 8f3529c..2eb6e9d 100644
--- a/osm_nbi/descriptor_topics.py
+++ b/osm_nbi/descriptor_topics.py
@@ -20,6 +20,7 @@
import os
import shutil
import functools
+import re
# import logging
from deepdiff import DeepDiff
@@ -46,11 +47,14 @@
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+valid_helm_chart_re = re.compile(
+ r"^[a-z0-9]([-a-z0-9]*[a-z0-9]/)?([a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
+)
+
class DescriptorTopic(BaseTopic):
def __init__(self, db, fs, msg, auth):
-
- BaseTopic.__init__(self, db, fs, msg, auth)
+ super().__init__(db, fs, msg, auth)
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
final_content = super().check_conflict_on_edit(
@@ -838,10 +842,24 @@
self.validate_internal_virtual_links(indata)
self.validate_monitoring_params(indata)
self.validate_scaling_group_descriptor(indata)
+ self.validate_helm_chart(indata)
return indata
@staticmethod
+ def validate_helm_chart(indata):
+ kdus = indata.get("kdu", [])
+ for kdu in kdus:
+ helm_chart_value = kdu.get("helm-chart")
+ if not helm_chart_value:
+ continue
+ if not valid_helm_chart_re.match(helm_chart_value):
+ raise EngineException(
+ "helm-chart '{}' is not valid".format(helm_chart_value),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ @staticmethod
def validate_mgmt_interface_connection_point(indata):
if not indata.get("vdu"):
return
@@ -1360,7 +1378,7 @@
topic_msg = "nsd"
def __init__(self, db, fs, msg, auth):
- DescriptorTopic.__init__(self, db, fs, msg, auth)
+ super().__init__(db, fs, msg, auth)
def pyangbind_validation(self, item, data, force=False):
if self._descriptor_data_is_in_old_format(data):
diff --git a/osm_nbi/tests/test_descriptor_topics.py b/osm_nbi/tests/test_descriptor_topics.py
index 147f3e7..00b18a5 100755
--- a/osm_nbi/tests/test_descriptor_topics.py
+++ b/osm_nbi/tests/test_descriptor_topics.py
@@ -27,11 +27,21 @@
from time import time
from osm_common import dbbase, fsbase, msgbase
from osm_nbi import authconn
-from osm_nbi.tests.test_pkg_descriptors import db_vnfds_text, db_nsds_text
+from osm_nbi.tests.test_pkg_descriptors import (
+ db_vnfds_text,
+ db_nsds_text,
+ vnfd_exploit_text,
+ vnfd_exploit_fixed_text,
+)
from osm_nbi.descriptor_topics import VnfdTopic, NsdTopic
from osm_nbi.engine import EngineException
from osm_common.dbbase import DbException
import yaml
+import tempfile
+import collections
+import collections.abc
+
+collections.MutableSequence = collections.abc.MutableSequence
test_name = "test-user"
db_vnfd_content = yaml.load(db_vnfds_text, Loader=yaml.Loader)[0]
@@ -46,11 +56,23 @@
"public": False,
"allow_show_user_project_role": True,
}
+UUID = "00000000-0000-0000-0000-000000000000"
-def norm(str):
+def admin_value():
+ return {"projects_read": []}
+
+
+def setup_mock_fs(fs):
+ fs.path = ""
+ fs.get_params.return_value = {}
+ fs.file_exists.return_value = False
+ fs.file_open.side_effect = lambda path, mode: tempfile.TemporaryFile(mode="a+b")
+
+
+def norm(s: str):
"""Normalize string for checking"""
- return " ".join(str.strip().split()).lower()
+ return " ".join(s.strip().split()).lower()
def compare_desc(tc, d1, d2, k):
@@ -94,7 +116,7 @@
self.topic.check_quota = Mock(return_value=None) # skip quota
@contextmanager
- def assertNotRaises(self, exception_type):
+ def assertNotRaises(self, exception_type=Exception):
try:
yield None
except exception_type:
@@ -105,6 +127,142 @@
new_desc = deepcopy(template)
return old_desc, new_desc
+ def prepare_vnfd_creation(self):
+ setup_mock_fs(self.fs)
+ test_vnfd = deepcopy(db_vnfd_content)
+ did = db_vnfd_content["_id"]
+ self.db.create.return_value = did
+ self.db.get_one.side_effect = [
+ {"_id": did, "_admin": deepcopy(db_vnfd_content["_admin"])},
+ None,
+ ]
+ return did, test_vnfd
+
+ def prepare_vnfd(self, vnfd_text):
+ setup_mock_fs(self.fs)
+ test_vnfd = yaml.safe_load(vnfd_text)
+ self.db.create.return_value = UUID
+ self.db.get_one.side_effect = [
+ {"_id": UUID, "_admin": admin_value()},
+ None,
+ ]
+ return UUID, test_vnfd
+
+ def prepare_test_vnfd(self, test_vnfd):
+ del test_vnfd["_id"]
+ del test_vnfd["_admin"]
+ del test_vnfd["vdu"][0]["cloud-init-file"]
+ del test_vnfd["df"][0]["lcm-operations-configuration"]["operate-vnf-op-config"][
+ "day1-2"
+ ][0]["execution-environment-list"][0]["juju"]
+ return test_vnfd
+
+ @patch("osm_nbi.descriptor_topics.shutil")
+ @patch("osm_nbi.descriptor_topics.os.rename")
+ def test_new_vnfd_normal_creation(self, mock_rename, mock_shutil):
+ did, test_vnfd = self.prepare_vnfd_creation()
+ test_vnfd = self.prepare_test_vnfd(test_vnfd)
+ rollback = []
+ did2, oid = self.topic.new(rollback, fake_session, {})
+ db_args = self.db.create.call_args[0]
+ msg_args = self.msg.write.call_args[0]
+
+ self.assertEqual(len(rollback), 1, "Wrong rollback length")
+ self.assertEqual(msg_args[0], self.topic.topic_msg, "Wrong message topic")
+ self.assertEqual(msg_args[1], "created", "Wrong message action")
+ self.assertEqual(msg_args[2], {"_id": did}, "Wrong message content")
+ self.assertEqual(db_args[0], self.topic.topic, "Wrong DB topic")
+ self.assertEqual(did2, did, "Wrong DB VNFD id")
+ self.assertIsNotNone(db_args[1]["_admin"]["created"], "Wrong creation time")
+ self.assertEqual(
+ db_args[1]["_admin"]["modified"],
+ db_args[1]["_admin"]["created"],
+ "Wrong modification time",
+ )
+ self.assertEqual(
+ db_args[1]["_admin"]["projects_read"],
+ [test_pid],
+ "Wrong read-only project list",
+ )
+ self.assertEqual(
+ db_args[1]["_admin"]["projects_write"],
+ [test_pid],
+ "Wrong read-write project list",
+ )
+
+ self.db.get_one.side_effect = [
+ {"_id": did, "_admin": deepcopy(db_vnfd_content["_admin"])},
+ None,
+ ]
+
+ self.topic.upload_content(
+ fake_session, did, test_vnfd, {}, {"Content-Type": []}
+ )
+ msg_args = self.msg.write.call_args[0]
+ test_vnfd["_id"] = did
+ self.assertEqual(msg_args[0], self.topic.topic_msg, "Wrong message topic")
+ self.assertEqual(msg_args[1], "edited", "Wrong message action")
+ self.assertEqual(msg_args[2], test_vnfd, "Wrong message content")
+
+ db_args = self.db.get_one.mock_calls[0][1]
+ self.assertEqual(db_args[0], self.topic.topic, "Wrong DB topic")
+ self.assertEqual(db_args[1]["_id"], did, "Wrong DB VNFD id")
+
+ db_args = self.db.replace.call_args[0]
+ self.assertEqual(db_args[0], self.topic.topic, "Wrong DB topic")
+ self.assertEqual(db_args[1], did, "Wrong DB VNFD id")
+
+ admin = db_args[2]["_admin"]
+ db_admin = deepcopy(db_vnfd_content["_admin"])
+ self.assertEqual(admin["type"], "vnfd", "Wrong descriptor type")
+ self.assertEqual(admin["created"], db_admin["created"], "Wrong creation time")
+ self.assertGreater(
+ admin["modified"], db_admin["created"], "Wrong modification time"
+ )
+ self.assertEqual(
+ admin["projects_read"],
+ db_admin["projects_read"],
+ "Wrong read-only project list",
+ )
+ self.assertEqual(
+ admin["projects_write"],
+ db_admin["projects_write"],
+ "Wrong read-write project list",
+ )
+ self.assertEqual(
+ admin["onboardingState"], "ONBOARDED", "Wrong onboarding state"
+ )
+ self.assertEqual(
+ admin["operationalState"], "ENABLED", "Wrong operational state"
+ )
+ self.assertEqual(admin["usageState"], "NOT_IN_USE", "Wrong usage state")
+
+ storage = admin["storage"]
+ self.assertEqual(storage["folder"], did + ":1", "Wrong storage folder")
+ self.assertEqual(storage["descriptor"], "package", "Wrong storage descriptor")
+ self.assertEqual(admin["revision"], 1, "Wrong revision number")
+ compare_desc(self, test_vnfd, db_args[2], "VNFD")
+
+ @patch("osm_nbi.descriptor_topics.shutil")
+ @patch("osm_nbi.descriptor_topics.os.rename")
+ def test_new_vnfd_exploit(self, mock_rename, mock_shutil):
+ id, test_vnfd = self.prepare_vnfd(vnfd_exploit_text)
+
+ with self.assertRaises(EngineException):
+ self.topic.upload_content(
+ fake_session, id, test_vnfd, {}, {"Content-Type": []}
+ )
+
+ @patch("osm_nbi.descriptor_topics.shutil")
+ @patch("osm_nbi.descriptor_topics.os.rename")
+ def test_new_vnfd_valid_helm_chart(self, mock_rename, mock_shutil):
+ id, test_vnfd = self.prepare_vnfd(vnfd_exploit_fixed_text)
+
+ with self.assertNotRaises():
+ self.topic.upload_content(
+ fake_session, id, test_vnfd, {}, {"Content-Type": []}
+ )
+
@patch("osm_nbi.descriptor_topics.shutil")
@patch("osm_nbi.descriptor_topics.os.rename")
def test_new_vnfd(self, mock_rename, mock_shutil):
@@ -1323,8 +1481,8 @@
did = db_nsd_content["_id"]
self.fs.get_params.return_value = {}
self.fs.file_exists.return_value = False
- self.fs.file_open.side_effect = lambda path, mode: open(
- "/tmp/" + str(uuid4()), "a+b"
+ self.fs.file_open.side_effect = lambda path, mode: tempfile.TemporaryFile(
+ mode="a+b"
)
test_nsd = deepcopy(db_nsd_content)
del test_nsd["_id"]
diff --git a/osm_nbi/tests/test_pkg_descriptors.py b/osm_nbi/tests/test_pkg_descriptors.py
index 91e5641..39c28fe 100644
--- a/osm_nbi/tests/test_pkg_descriptors.py
+++ b/osm_nbi/tests/test_pkg_descriptors.py
@@ -20,6 +20,30 @@
__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
__date__ = "2019-11-20"
+
+# Exploit exists in the key kdu.helm-chart
+vnfd_exploit_text = """
+ _id: 00000000-0000-0000-0000-000000000000
+ id: n2vc-rce_vnfd
+ df:
+ - id: default-df
+ kdu:
+ - name: exploit
+ helm-chart: "local/exploit --post-renderer /bin/bash"
+ helm-version: v3
+"""
+
+# Exploit in kdu.helm-chart is fixed
+vnfd_exploit_fixed_text = """
+ id: n2vc-rce_vnfd
+ df:
+ - id: default-df
+ kdu:
+ - name: exploit
+ helm-chart: "local/exploit"
+ helm-version: v3
+"""
+
db_vnfds_text = """
---
- _admin:
@@ -49,7 +73,7 @@
product-name: hackfest3charmed-vnf
version: '1.0'
mgmt-cp: vnf-mgmt-ext
-
+
virtual-compute-desc:
- id: mgmt-compute
virtual-cpu:
@@ -61,17 +85,17 @@
num-virtual-cpu: 2
virtual-memory:
size: '2'
-
+
virtual-storage-desc:
- id: mgmt-storage
size-of-storage: '20'
- id: data-storage
size-of-storage: '20'
-
+
sw-image-desc:
- id: hackfest3-mgmt
name: hackfest3-mgmt
-
+
vdu:
- id: mgmtVM
name: mgmtVM
@@ -118,10 +142,10 @@
- id: dataVM_cpu_util
name: dataVM_cpu_util
performance-metric: cpu_utilization
-
+
int-virtual-link-desc:
- id: internal
-
+
ext-cpd:
- id: vnf-mgmt-ext
int-cpd: # Connection to int-cpd
@@ -131,7 +155,7 @@
int-cpd: # Connection to int-cpd
vdu-id: dataVM
cpd: vnf-data
-
+
df:
- id: hackfest_default
vdu-profile: